| #!/usr/bin/python |
| |
| # Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| import logging.handlers |
| import os |
| import subprocess |
| import sys |
| import time |
| import urllib2 |
| |
| import_path = os.environ.get("SYSROOT", "") + "/usr/lib/flimflam/test" |
| sys.path.append(import_path) |
| import flimflam |
| |
| class SpeedTester(object): |
| @staticmethod |
| def CommandOutput(command): |
| p = subprocess.Popen(command, stdout=subprocess.PIPE) |
| return p.communicate()[0] |
| |
| def __init__(self): |
| self.keyvals = [] |
| self.flim = None |
| |
| def write_perf_keyval(self, d): |
| self.keyvals.append(d) |
| logging.info(str(d).replace('%', '%%')) |
| |
| def ExtractPerfField(self, field): |
| return [kv[field] |
| for kv in self.keyvals |
| if field in kv] |
| |
| def FetchUrl(self, url_pattern= |
| 'http://testing-chargen.appspot.com/download?size=%d', |
| size=10, |
| label=None): |
| """Fetch the URL, wirte a dictionary of performance data.""" |
| |
| if not label: |
| raise Exception('no label supplied') |
| |
| url = url_pattern % size |
| logging.info('Fetching: %s', url) |
| start_time = time.time() |
| result = urllib2.urlopen(url) |
| bytes_received = len(result.read()) |
| fetch_time = time.time() - start_time |
| if not fetch_time: |
| raise Exception('Fetch took 0 time') |
| |
| if bytes_received != size: |
| raise Exception('asked for %d bytes, got %d' % |
| (size, bytes_received)) |
| |
| self.write_perf_keyval( |
| {'seconds_%s_fetch_time' % label: fetch_time, |
| 'bytes_%s_bytes_received' % label: bytes_received, |
| 'bits_second_%s_speed' % label: 8 * bytes_received / fetch_time} |
| ) |
| |
| def run_once_internal(self, connect_count): |
| logging.info('%s', SpeedTester.CommandOutput(['/sbin/route', '-n'])) |
| |
| for _ in xrange(connect_count): |
| self.FetchUrl(label='3G', size=1 << 19) |
| |
| def run_once(self, connect_count): |
| self.flim = flimflam.FlimFlam() |
| self.run_once_internal(connect_count) |
| speeds = self.ExtractPerfField("bits_second_3G_speed") |
| logging.info('Speeds: %s', speeds) |
| mean = sum(speeds) / len(speeds) |
| |
| print "==============================" |
| print "Mean speed in kbit/second:", mean / 1000 |
| |
| |
| if __name__ == '__main__': |
| logger = logging.getLogger() |
| logger.setLevel(logging.INFO) |
| logging.info('Beginning') |
| tester = SpeedTester() |
| tester.run_once(4) |