| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Copyright (C) 2015, ARM Limited and contributors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| from bart.common.Analyzer import Analyzer |
| import collections |
| from collections import namedtuple |
| import datetime |
| import gzip |
| import json |
| import os |
| import re |
| import time |
| import trappy |
| |
| # Configure logging |
| import logging |
| |
| # Add JSON parsing support |
| from conf import JsonConf |
| |
| import wlgen |
| |
| from devlib import TargetError |
| |
| Experiment = namedtuple('Experiment', ['wload_name', 'wload', |
| 'conf', 'iteration', 'out_dir']) |
| |
| class Executor(): |
| critical_tasks = { |
| 'linux': ['init', 'systemd', 'sh', 'ssh'], |
| 'android': [ |
| 'sh', 'adbd', 'init', |
| 'usb', 'transport', |
| # We don't actually need this task but on Google Pixel it apparently |
| # cannot be frozen, so the cgroup state gets stuck in FREEZING if we |
| # try to freeze it. |
| 'thermal-engine' |
| ] |
| } |
| """Tasks in the system that we can't afford to freeze""" |
| |
| def __init__(self, test_env, experiments_conf): |
| """ |
| Tests Executor |
| |
| A tests executor is a module which support the execution of a |
| configured set of experiments. Each experiment is composed by: |
| - a target configuration |
| - a worload to execute |
| |
| The executor module can be configured to run a set of workloads (wloads) |
| in each different target configuration of a specified set (confs). These |
| wloads and confs can be specified by the "experiments_conf" input |
| dictionary. Each (workload, conf, iteration) tuple is called an |
| "experiment". |
| |
| All the results generated by each experiment will be collected a result |
| folder which is named according to this template: |
| results/<test_id>/<wltype>:<conf>:<wload>/<run_id> |
| where: |
| - <test_id> : the "tid" defined by the experiments_conf, or a timestamp |
| based folder in case "tid" is not specified |
| - <wltype> : the class of workload executed, e.g. rtapp or sched_perf |
| - <conf> : the identifier of one of the specified configurations |
| - <wload> : the identified of one of the specified workload |
| - <run_id> : the progressive execution number from 1 up to the |
| specified iterations |
| |
| After the workloads have been run, the Executor object's `experiments` |
| attribute is a list of Experiment objects. The `out_dir` attribute of |
| these objects can be used to find the results of the experiment. |
| """ |
| |
| # Initialize globals |
| self._default_cgroup = None |
| self._cgroup = None |
| |
| # Setup logging |
| self._log = logging.getLogger('Executor') |
| |
| # Setup test configuration |
| if isinstance(experiments_conf, dict): |
| self._log.info('Loading custom (inline) test configuration') |
| self._experiments_conf = experiments_conf |
| elif isinstance(experiments_conf, str): |
| self._log.info('Loading custom (file) test configuration') |
| json_conf = JsonConf(experiments_conf) |
| self._experiments_conf = json_conf.load() |
| else: |
| raise ValueError( |
| 'experiments_conf must be either a dictionary or a filepath') |
| |
| # Check for mandatory configurations |
| if not self._experiments_conf.get('confs', None): |
| raise ValueError('Configuration error: ' |
| 'missing "conf" definitions') |
| if not self._experiments_conf.get('wloads', None): |
| raise ValueError('Configuration error: ' |
| 'missing "wloads" definitions') |
| |
| self.te = test_env |
| self.target = self.te.target |
| |
| self._iterations = self._experiments_conf.get('iterations', 1) |
| # Compute total number of experiments |
| self._exp_count = self._iterations \ |
| * len(self._experiments_conf['wloads']) \ |
| * len(self._experiments_conf['confs']) |
| |
| self._print_section('Experiments configuration') |
| |
| self._log.info('Configured to run:') |
| |
| self._log.info(' %3d target configurations:', |
| len(self._experiments_conf['confs'])) |
| target_confs = [conf['tag'] for conf in self._experiments_conf['confs']] |
| target_confs = ', '.join(target_confs) |
| self._log.info(' %s', target_confs) |
| |
| self._log.info(' %3d workloads (%d iterations each)', |
| len(self._experiments_conf['wloads']), |
| self._iterations) |
| wload_confs = ', '.join(self._experiments_conf['wloads']) |
| self._log.info(' %s', wload_confs) |
| |
| self._log.info('Total: %d experiments', self._exp_count) |
| |
| self._log.info('Results will be collected under:') |
| self._log.info(' %s', self.te.res_dir) |
| |
| if any(wl['type'] == 'rt-app' |
| for wl in self._experiments_conf['wloads'].values()): |
| self._log.info('rt-app workloads found, installing tool on target') |
| self.te.install_tools(['rt-app']) |
| |
| def run(self): |
| self._print_section('Experiments execution') |
| |
| self.experiments = [] |
| |
| # Run all the configured experiments |
| exp_idx = 0 |
| for tc in self._experiments_conf['confs']: |
| # TARGET: configuration |
| if not self._target_configure(tc): |
| continue |
| for wl_idx in self._experiments_conf['wloads']: |
| # TEST: configuration |
| wload, test_dir = self._wload_init(tc, wl_idx) |
| for itr_idx in range(1, self._iterations + 1): |
| exp = Experiment( |
| wload_name=wl_idx, |
| wload=wload, |
| conf=tc, |
| iteration=itr_idx, |
| out_dir=os.path.join(test_dir, str(itr_idx))) |
| self.experiments.append(exp) |
| |
| # WORKLOAD: execution |
| self._wload_run(exp_idx, exp) |
| exp_idx += 1 |
| self._target_cleanup(tc) |
| |
| self._print_section('Experiments execution completed') |
| self._log.info('Results available in:') |
| self._log.info(' %s', self.te.res_dir) |
| |
| |
| ################################################################################ |
| # Target Configuration |
| ################################################################################ |
| |
| def _cgroups_init(self, tc): |
| self._default_cgroup = None |
| if 'cgroups' not in tc: |
| return True |
| if 'cgroups' not in self.target.modules: |
| raise RuntimeError('CGroups module not available. Please ensure ' |
| '"cgroups" is listed in your target/test modules') |
| self._log.info('Initialize CGroups support...') |
| errors = False |
| for kind in tc['cgroups']['conf']: |
| self._log.info('Setup [%s] CGroup controller...', kind) |
| controller = self.target.cgroups.controller(kind) |
| if not controller: |
| self._log.warning('CGroups controller [%s] NOT available', |
| kind) |
| errors = True |
| return not errors |
| |
| def _setup_kernel(self, tc): |
| # Deploy kernel on the device |
| self.te.install_kernel(tc, reboot=True) |
| # Setup the rootfs for the experiments |
| self._setup_rootfs(tc) |
| |
| def _setup_sched_features(self, tc): |
| if 'sched_features' not in tc: |
| self._log.debug('Scheduler features configuration not provided') |
| return |
| feats = tc['sched_features'].split(",") |
| for feat in feats: |
| self._log.info('Set scheduler feature: %s', feat) |
| self.target.execute('echo {} > /sys/kernel/debug/sched_features'.format(feat), |
| as_root=True) |
| |
| def _setup_rootfs(self, tc): |
| # Initialize CGroups if required |
| self._cgroups_init(tc) |
| # Setup target folder for experiments execution |
| self.te.run_dir = os.path.join( |
| self.target.working_directory, TGT_RUN_DIR) |
| # Create run folder as tmpfs |
| self._log.debug('Setup RT-App run folder [%s]...', self.te.run_dir) |
| self.target.execute('[ -d {0} ] || mkdir {0}'\ |
| .format(self.te.run_dir)) |
| self.target.execute( |
| 'grep schedtest /proc/mounts || '\ |
| ' mount -t tmpfs -o size=1024m {} {}'\ |
| .format('schedtest', self.te.run_dir), |
| as_root=True) |
| # tmpfs mounts have an SELinux context with "tmpfs" as the type (while |
| # other files we create have "shell_data_file"). That prevents non-root |
| # users from creating files in tmpfs mounts. For now, just put SELinux |
| # in permissive mode to get around that. |
| try: |
| # First, save the old SELinux mode |
| self._old_selinux_mode = self.target.execute('getenforce') |
| except TargetError: |
| # Probably the target doesn't have SELinux. No problem. |
| self._old_selinux_mode = None |
| else: |
| self._log.warning('Setting target SELinux in permissive mode') |
| self.target.execute('setenforce 0', as_root=True) |
| |
| def _setup_cpufreq(self, tc): |
| if 'cpufreq' not in tc: |
| self._log.warning('cpufreq governor not specified, ' |
| 'using currently configured governor') |
| return |
| |
| cpufreq = tc['cpufreq'] |
| self._log.info('Configuring all CPUs to use [%s] cpufreq governor', |
| cpufreq['governor']) |
| |
| self.target.cpufreq.set_all_governors(cpufreq['governor']) |
| |
| if 'freqs' in cpufreq: |
| if cpufreq['governor'] != 'userspace': |
| raise ValueError('Must use userspace governor to set CPU freqs') |
| self._log.info(r'%14s - CPU frequencies: %s', |
| 'CPUFreq', str(cpufreq['freqs'])) |
| for cpu, freq in cpufreq['freqs'].iteritems(): |
| self.target.cpufreq.set_frequency(cpu, freq) |
| |
| if 'params' in cpufreq: |
| self._log.info('governor params: %s', str(cpufreq['params'])) |
| for cpu in self.target.list_online_cpus(): |
| self.target.cpufreq.set_governor_tunables( |
| cpu, |
| cpufreq['governor'], |
| **cpufreq['params']) |
| |
| def _setup_cgroups(self, tc): |
| if 'cgroups' not in tc: |
| return True |
| # Setup default CGroup to run tasks into |
| if 'default' in tc['cgroups']: |
| self._default_cgroup = tc['cgroups']['default'] |
| # Configure each required controller |
| if 'conf' not in tc['cgroups']: |
| return True |
| errors = False |
| for kind in tc['cgroups']['conf']: |
| controller = self.target.cgroups.controller(kind) |
| if not controller: |
| self._log.warning('Configuration error: ' |
| '[%s] contoller NOT supported', |
| kind) |
| errors = True |
| continue |
| self._setup_controller(tc, controller) |
| return not errors |
| |
| def _setup_controller(self, tc, controller): |
| kind = controller.kind |
| # Configure each required groups for that controller |
| errors = False |
| for name in tc['cgroups']['conf'][controller.kind]: |
| if name[0] != '/': |
| raise ValueError('Wrong CGroup name [{}]. ' |
| 'CGroups names must start by "/".' |
| .format(name)) |
| group = controller.cgroup(name) |
| if not group: |
| self._log.warning('Configuration error: ' |
| '[%s/%s] cgroup NOT available', |
| kind, name) |
| errors = True |
| continue |
| self._setup_group(tc, group) |
| return not errors |
| |
| def _setup_group(self, tc, group): |
| kind = group.controller.kind |
| name = group.name |
| # Configure each required attribute |
| group.set(**tc['cgroups']['conf'][kind][name]) |
| |
| def _target_configure(self, tc): |
| self._print_header( |
| 'configuring target for [{}] experiments'\ |
| .format(tc['tag'])) |
| self._setup_kernel(tc) |
| self._setup_sched_features(tc) |
| self._setup_cpufreq(tc) |
| return self._setup_cgroups(tc) |
| |
| def _target_conf_flag(self, tc, flag): |
| if 'flags' not in tc: |
| has_flag = False |
| else: |
| has_flag = flag in tc['flags'] |
| self._log.debug('Check if target configuration [%s] has flag [%s]: %s', |
| tc['tag'], flag, has_flag) |
| return has_flag |
| |
| def _target_cleanup(self, tc): |
| if self._old_selinux_mode is not None: |
| self._log.info('Restoring target SELinux mode: %s', |
| self._old_selinux_mode) |
| self.target.execute('setenforce ' + self._old_selinux_mode, |
| as_root=True) |
| |
| ################################################################################ |
| # Workload Setup and Execution |
| ################################################################################ |
| |
| def _wload_cpus(self, wl_idx, wlspec): |
| if not 'cpus' in wlspec['conf']: |
| return None |
| cpus = wlspec['conf']['cpus'] |
| |
| if type(cpus) == list: |
| return cpus |
| if type(cpus) == int: |
| return [cpus] |
| |
| # SMP target (or not bL module loaded) |
| if not hasattr(self.target, 'bl'): |
| if 'first' in cpus: |
| return [ self.target.list_online_cpus()[0] ] |
| if 'last' in cpus: |
| return [ self.target.list_online_cpus()[-1] ] |
| return self.target.list_online_cpus() |
| |
| # big.LITTLE target |
| if cpus.startswith('littles'): |
| if 'first' in cpus: |
| return [ self.target.bl.littles_online[0] ] |
| if 'last' in cpus: |
| return [ self.target.bl.littles_online[-1] ] |
| return self.target.bl.littles_online |
| if cpus.startswith('bigs'): |
| if 'first' in cpus: |
| return [ self.target.bl.bigs_online[0] ] |
| if 'last' in cpus: |
| return [ self.target.bl.bigs_online[-1] ] |
| return self.target.bl.bigs_online |
| raise ValueError('unsupported [{}] "cpus" value for [{}] ' |
| 'workload specification' |
| .format(cpus, wl_idx)) |
| |
| def _wload_task_idxs(self, wl_idx, tasks): |
| if type(tasks) == int: |
| return range(tasks) |
| if tasks == 'cpus': |
| return range(len(self.target.core_names)) |
| if tasks == 'little': |
| return range(len([t |
| for t in self.target.core_names |
| if t == self.target.little_core])) |
| if tasks == 'big': |
| return range(len([t |
| for t in self.target.core_names |
| if t == self.target.big_core])) |
| raise ValueError('unsupported "tasks" value for [{}] RT-App ' |
| 'workload specification' |
| .format(wl_idx)) |
| |
| def _wload_rtapp(self, wl_idx, wlspec, cpus): |
| conf = wlspec['conf'] |
| self._log.debug('Configuring [%s] rt-app...', conf['class']) |
| |
| # Setup a default "empty" task name prefix |
| if 'prefix' not in conf: |
| conf['prefix'] = 'task_' |
| |
| # Setup a default loadref CPU |
| loadref = None |
| if 'loadref' in wlspec: |
| loadref = wlspec['loadref'] |
| |
| if conf['class'] == 'profile': |
| params = {} |
| # Load each task specification |
| for task_name, task in conf['params'].items(): |
| if task['kind'] not in wlgen.__dict__: |
| self._log.error('RTA task of kind [%s] not supported', |
| task['kind']) |
| raise ValueError('unsupported "kind" value for task [{}] ' |
| 'in RT-App workload specification' |
| .format(task)) |
| task_ctor = getattr(wlgen, task['kind']) |
| num_tasks = task.get('tasks', 1) |
| task_idxs = self._wload_task_idxs(wl_idx, num_tasks) |
| for idx in task_idxs: |
| idx_name = str(idx) if len(task_idxs) > 0 else "" |
| task_name_idx = conf['prefix'] + task_name + idx_name |
| params[task_name_idx] = task_ctor(**task['params']).get() |
| |
| rtapp = wlgen.RTA(self.target, |
| wl_idx, calibration = self.te.calibration()) |
| rtapp.conf(kind='profile', params=params, loadref=loadref, |
| cpus=cpus, run_dir=self.te.run_dir) |
| return rtapp |
| |
| if conf['class'] == 'periodic': |
| task_idxs = self._wload_task_idxs(wl_idx, conf['tasks']) |
| params = {} |
| for idx in task_idxs: |
| task = conf['prefix'] + str(idx) |
| params[task] = wlgen.Periodic(**conf['params']).get() |
| rtapp = wlgen.RTA(self.target, |
| wl_idx, calibration = self.te.calibration()) |
| rtapp.conf(kind='profile', params=params, loadref=loadref, |
| cpus=cpus, run_dir=self.te.run_dir) |
| return rtapp |
| |
| if conf['class'] == 'custom': |
| rtapp = wlgen.RTA(self.target, |
| wl_idx, calibration = self.te.calib) |
| rtapp.conf(kind='custom', |
| params=conf['json'], |
| duration=conf['duration'], |
| loadref=loadref, |
| cpus=cpus, run_dir=self.te.run_dir) |
| return rtapp |
| |
| raise ValueError('unsupported \'class\' value for [{}] ' |
| 'RT-App workload specification' |
| .format(wl_idx)) |
| |
| def _wload_perf_bench(self, wl_idx, wlspec, cpus): |
| conf = wlspec['conf'] |
| self._log.debug('Configuring perf_message...') |
| |
| if conf['class'] == 'messaging': |
| perf_bench = wlgen.PerfMessaging(self.target, wl_idx) |
| perf_bench.conf(**conf['params']) |
| return perf_bench |
| |
| if conf['class'] == 'pipe': |
| perf_bench = wlgen.PerfPipe(self.target, wl_idx) |
| perf_bench.conf(**conf['params']) |
| return perf_bench |
| |
| raise ValueError('unsupported "class" value for [{}] ' |
| 'perf bench workload specification' |
| .format(wl_idx)) |
| |
| def _wload_conf(self, wl_idx, wlspec): |
| |
| # CPUS: setup execution on CPUs if required by configuration |
| cpus = self._wload_cpus(wl_idx, wlspec) |
| |
| # CGroup: setup CGroups if requried by configuration |
| self._cgroup = self._default_cgroup |
| if 'cgroup' in wlspec: |
| if 'cgroups' not in self.target.modules: |
| raise RuntimeError('Target not supporting CGroups or CGroups ' |
| 'not configured for the current test configuration') |
| self._cgroup = wlspec['cgroup'] |
| |
| if wlspec['type'] == 'rt-app': |
| return self._wload_rtapp(wl_idx, wlspec, cpus) |
| if wlspec['type'] == 'perf_bench': |
| return self._wload_perf_bench(wl_idx, wlspec, cpus) |
| |
| |
| raise ValueError('unsupported "type" value for [{}] ' |
| 'workload specification' |
| .format(wl_idx)) |
| |
| def _wload_init(self, tc, wl_idx): |
| tc_idx = tc['tag'] |
| |
| # Configure the test workload |
| wlspec = self._experiments_conf['wloads'][wl_idx] |
| wload = self._wload_conf(wl_idx, wlspec) |
| |
| # Keep track of platform configuration |
| test_dir = '{}/{}:{}:{}'\ |
| .format(self.te.res_dir, wload.wtype, tc_idx, wl_idx) |
| os.makedirs(test_dir) |
| self.te.platform_dump(test_dir) |
| |
| # Keep track of kernel configuration and version |
| config = self.target.config |
| with gzip.open(os.path.join(test_dir, 'kernel.config'), 'wb') as fh: |
| fh.write(config.text) |
| output = self.target.execute('{} uname -a'\ |
| .format(self.target.busybox)) |
| with open(os.path.join(test_dir, 'kernel.version'), 'w') as fh: |
| fh.write(output) |
| |
| return wload, test_dir |
| |
| def _wload_run(self, exp_idx, experiment): |
| tc = experiment.conf |
| wload = experiment.wload |
| tc_idx = tc['tag'] |
| |
| self._print_title('Experiment {}/{}, [{}:{}] {}/{}'\ |
| .format(exp_idx, self._exp_count, |
| tc_idx, experiment.wload_name, |
| experiment.iteration, self._iterations)) |
| |
| # Setup local results folder |
| self._log.debug('out_dir set to [%s]', experiment.out_dir) |
| os.system('mkdir -p ' + experiment.out_dir) |
| |
| # Freeze all userspace tasks that we don't need for running tests |
| need_thaw = False |
| if self._target_conf_flag(tc, 'freeze_userspace'): |
| need_thaw = self._freeze_userspace() |
| |
| # FTRACE: start (if a configuration has been provided) |
| if self.te.ftrace and self._target_conf_flag(tc, 'ftrace'): |
| self._log.warning('FTrace events collection enabled') |
| self.te.ftrace.start() |
| |
| # ENERGY: start sampling |
| if self.te.emeter: |
| self.te.emeter.reset() |
| |
| # WORKLOAD: Run the configured workload |
| wload.run(out_dir=experiment.out_dir, cgroup=self._cgroup) |
| |
| # ENERGY: collect measurements |
| if self.te.emeter: |
| self.te.emeter.report(experiment.out_dir) |
| |
| # FTRACE: stop and collect measurements |
| if self.te.ftrace and self._target_conf_flag(tc, 'ftrace'): |
| self.te.ftrace.stop() |
| |
| trace_file = experiment.out_dir + '/trace.dat' |
| self.te.ftrace.get_trace(trace_file) |
| self._log.info('Collected FTrace binary trace:') |
| self._log.info(' %s', |
| trace_file.replace(self.te.res_dir, '<res_dir>')) |
| |
| stats_file = experiment.out_dir + '/trace_stat.json' |
| self.te.ftrace.get_stats(stats_file) |
| self._log.info('Collected FTrace function profiling:') |
| self._log.info(' %s', |
| stats_file.replace(self.te.res_dir, '<res_dir>')) |
| |
| # Unfreeze the tasks we froze |
| if need_thaw: |
| self._thaw_userspace() |
| |
| self._print_footer() |
| |
| def _freeze_userspace(self): |
| if 'cgroups' not in self.target.modules: |
| raise RuntimeError( |
| 'Failed to freeze userspace. Ensure "cgroups" module is listed ' |
| 'among modules in target/test configuration') |
| controllers = [s.name for s in self.target.cgroups.list_subsystems()] |
| if 'freezer' not in controllers: |
| self._log.warning('No freezer cgroup controller on target. ' |
| 'Not freezing userspace') |
| return False |
| |
| exclude = self.critical_tasks[self.te.target.os] |
| self._log.info('Freezing all tasks except: %s', ','.join(exclude)) |
| self.te.target.cgroups.freeze(exclude) |
| return True |
| |
| |
| def _thaw_userspace(self): |
| self._log.info('Un-freezing userspace tasks') |
| self.te.target.cgroups.freeze(thaw=True) |
| |
| ################################################################################ |
| # Utility Functions |
| ################################################################################ |
| |
| def _print_section(self, message): |
| self._log.info('') |
| self._log.info(FMT_SECTION) |
| self._log.info(message) |
| self._log.info(FMT_SECTION) |
| |
| def _print_header(self, message): |
| self._log.info('') |
| self._log.info(FMT_HEADER) |
| self._log.info(message) |
| |
| def _print_title(self, message): |
| self._log.info(FMT_TITLE) |
| self._log.info(message) |
| |
| def _print_footer(self, message=None): |
| if message: |
| self._log.info(message) |
| self._log.info(FMT_FOOTER) |
| |
| |
| ################################################################################ |
| # Globals |
| ################################################################################ |
| |
| # Regular expression for comments |
| JSON_COMMENTS_RE = re.compile( |
| '(^)?[^\S\n]*/(?:\*(.*?)\*/[^\S\n]*|/[^\n]*)($)?', |
| re.DOTALL | re.MULTILINE |
| ) |
| |
| # Target specific paths |
| TGT_RUN_DIR = 'run_dir' |
| |
| # Logging formatters |
| FMT_SECTION = r'{:#<80}'.format('') |
| FMT_HEADER = r'{:=<80}'.format('') |
| FMT_TITLE = r'{:~<80}'.format('') |
| FMT_FOOTER = r'{:-<80}'.format('') |
| |
| # vim :set tabstop=4 shiftwidth=4 expandtab |