blob: 0866da6da233b86703525a4b41cce03d4f51704e [file] [log] [blame]
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from bart.common.Analyzer import Analyzer
import collections
from collections import namedtuple
import datetime
import gzip
import json
import os
import re
import time
import trappy
# Configure logging
import logging
# Add JSON parsing support
from conf import JsonConf
import wlgen
from devlib import TargetError
Experiment = namedtuple('Experiment', ['wload_name', 'wload',
'conf', 'iteration', 'out_dir'])
class Executor():
critical_tasks = {
'linux': ['init', 'systemd', 'sh', 'ssh'],
'android': [
'sh', 'adbd', 'init',
'usb', 'transport',
# We don't actually need this task but on Google Pixel it apparently
# cannot be frozen, so the cgroup state gets stuck in FREEZING if we
# try to freeze it.
'thermal-engine'
]
}
"""Tasks in the system that we can't afford to freeze"""
def __init__(self, test_env, experiments_conf):
"""
Tests Executor
A tests executor is a module which support the execution of a
configured set of experiments. Each experiment is composed by:
- a target configuration
- a worload to execute
The executor module can be configured to run a set of workloads (wloads)
in each different target configuration of a specified set (confs). These
wloads and confs can be specified by the "experiments_conf" input
dictionary. Each (workload, conf, iteration) tuple is called an
"experiment".
All the results generated by each experiment will be collected a result
folder which is named according to this template:
results/<test_id>/<wltype>:<conf>:<wload>/<run_id>
where:
- <test_id> : the "tid" defined by the experiments_conf, or a timestamp
based folder in case "tid" is not specified
- <wltype> : the class of workload executed, e.g. rtapp or sched_perf
- <conf> : the identifier of one of the specified configurations
- <wload> : the identified of one of the specified workload
- <run_id> : the progressive execution number from 1 up to the
specified iterations
After the workloads have been run, the Executor object's `experiments`
attribute is a list of Experiment objects. The `out_dir` attribute of
these objects can be used to find the results of the experiment.
"""
# Initialize globals
self._default_cgroup = None
self._cgroup = None
# Setup logging
self._log = logging.getLogger('Executor')
# Setup test configuration
if isinstance(experiments_conf, dict):
self._log.info('Loading custom (inline) test configuration')
self._experiments_conf = experiments_conf
elif isinstance(experiments_conf, str):
self._log.info('Loading custom (file) test configuration')
json_conf = JsonConf(experiments_conf)
self._experiments_conf = json_conf.load()
else:
raise ValueError(
'experiments_conf must be either a dictionary or a filepath')
# Check for mandatory configurations
if not self._experiments_conf.get('confs', None):
raise ValueError('Configuration error: '
'missing "conf" definitions')
if not self._experiments_conf.get('wloads', None):
raise ValueError('Configuration error: '
'missing "wloads" definitions')
self.te = test_env
self.target = self.te.target
self._iterations = self._experiments_conf.get('iterations', 1)
# Compute total number of experiments
self._exp_count = self._iterations \
* len(self._experiments_conf['wloads']) \
* len(self._experiments_conf['confs'])
self._print_section('Experiments configuration')
self._log.info('Configured to run:')
self._log.info(' %3d target configurations:',
len(self._experiments_conf['confs']))
target_confs = [conf['tag'] for conf in self._experiments_conf['confs']]
target_confs = ', '.join(target_confs)
self._log.info(' %s', target_confs)
self._log.info(' %3d workloads (%d iterations each)',
len(self._experiments_conf['wloads']),
self._iterations)
wload_confs = ', '.join(self._experiments_conf['wloads'])
self._log.info(' %s', wload_confs)
self._log.info('Total: %d experiments', self._exp_count)
self._log.info('Results will be collected under:')
self._log.info(' %s', self.te.res_dir)
if any(wl['type'] == 'rt-app'
for wl in self._experiments_conf['wloads'].values()):
self._log.info('rt-app workloads found, installing tool on target')
self.te.install_tools(['rt-app'])
def run(self):
self._print_section('Experiments execution')
self.experiments = []
# Run all the configured experiments
exp_idx = 0
for tc in self._experiments_conf['confs']:
# TARGET: configuration
if not self._target_configure(tc):
continue
for wl_idx in self._experiments_conf['wloads']:
# TEST: configuration
wload, test_dir = self._wload_init(tc, wl_idx)
for itr_idx in range(1, self._iterations + 1):
exp = Experiment(
wload_name=wl_idx,
wload=wload,
conf=tc,
iteration=itr_idx,
out_dir=os.path.join(test_dir, str(itr_idx)))
self.experiments.append(exp)
# WORKLOAD: execution
self._wload_run(exp_idx, exp)
exp_idx += 1
self._target_cleanup(tc)
self._print_section('Experiments execution completed')
self._log.info('Results available in:')
self._log.info(' %s', self.te.res_dir)
################################################################################
# Target Configuration
################################################################################
def _cgroups_init(self, tc):
self._default_cgroup = None
if 'cgroups' not in tc:
return True
if 'cgroups' not in self.target.modules:
raise RuntimeError('CGroups module not available. Please ensure '
'"cgroups" is listed in your target/test modules')
self._log.info('Initialize CGroups support...')
errors = False
for kind in tc['cgroups']['conf']:
self._log.info('Setup [%s] CGroup controller...', kind)
controller = self.target.cgroups.controller(kind)
if not controller:
self._log.warning('CGroups controller [%s] NOT available',
kind)
errors = True
return not errors
def _setup_kernel(self, tc):
# Deploy kernel on the device
self.te.install_kernel(tc, reboot=True)
# Setup the rootfs for the experiments
self._setup_rootfs(tc)
def _setup_sched_features(self, tc):
if 'sched_features' not in tc:
self._log.debug('Scheduler features configuration not provided')
return
feats = tc['sched_features'].split(",")
for feat in feats:
self._log.info('Set scheduler feature: %s', feat)
self.target.execute('echo {} > /sys/kernel/debug/sched_features'.format(feat),
as_root=True)
def _setup_rootfs(self, tc):
# Initialize CGroups if required
self._cgroups_init(tc)
# Setup target folder for experiments execution
self.te.run_dir = os.path.join(
self.target.working_directory, TGT_RUN_DIR)
# Create run folder as tmpfs
self._log.debug('Setup RT-App run folder [%s]...', self.te.run_dir)
self.target.execute('[ -d {0} ] || mkdir {0}'\
.format(self.te.run_dir))
self.target.execute(
'grep schedtest /proc/mounts || '\
' mount -t tmpfs -o size=1024m {} {}'\
.format('schedtest', self.te.run_dir),
as_root=True)
# tmpfs mounts have an SELinux context with "tmpfs" as the type (while
# other files we create have "shell_data_file"). That prevents non-root
# users from creating files in tmpfs mounts. For now, just put SELinux
# in permissive mode to get around that.
try:
# First, save the old SELinux mode
self._old_selinux_mode = self.target.execute('getenforce')
except TargetError:
# Probably the target doesn't have SELinux. No problem.
self._old_selinux_mode = None
else:
self._log.warning('Setting target SELinux in permissive mode')
self.target.execute('setenforce 0', as_root=True)
def _setup_cpufreq(self, tc):
if 'cpufreq' not in tc:
self._log.warning('cpufreq governor not specified, '
'using currently configured governor')
return
cpufreq = tc['cpufreq']
self._log.info('Configuring all CPUs to use [%s] cpufreq governor',
cpufreq['governor'])
self.target.cpufreq.set_all_governors(cpufreq['governor'])
if 'freqs' in cpufreq:
if cpufreq['governor'] != 'userspace':
raise ValueError('Must use userspace governor to set CPU freqs')
self._log.info(r'%14s - CPU frequencies: %s',
'CPUFreq', str(cpufreq['freqs']))
for cpu, freq in cpufreq['freqs'].iteritems():
self.target.cpufreq.set_frequency(cpu, freq)
if 'params' in cpufreq:
self._log.info('governor params: %s', str(cpufreq['params']))
for cpu in self.target.list_online_cpus():
self.target.cpufreq.set_governor_tunables(
cpu,
cpufreq['governor'],
**cpufreq['params'])
def _setup_cgroups(self, tc):
if 'cgroups' not in tc:
return True
# Setup default CGroup to run tasks into
if 'default' in tc['cgroups']:
self._default_cgroup = tc['cgroups']['default']
# Configure each required controller
if 'conf' not in tc['cgroups']:
return True
errors = False
for kind in tc['cgroups']['conf']:
controller = self.target.cgroups.controller(kind)
if not controller:
self._log.warning('Configuration error: '
'[%s] contoller NOT supported',
kind)
errors = True
continue
self._setup_controller(tc, controller)
return not errors
def _setup_controller(self, tc, controller):
kind = controller.kind
# Configure each required groups for that controller
errors = False
for name in tc['cgroups']['conf'][controller.kind]:
if name[0] != '/':
raise ValueError('Wrong CGroup name [{}]. '
'CGroups names must start by "/".'
.format(name))
group = controller.cgroup(name)
if not group:
self._log.warning('Configuration error: '
'[%s/%s] cgroup NOT available',
kind, name)
errors = True
continue
self._setup_group(tc, group)
return not errors
def _setup_group(self, tc, group):
kind = group.controller.kind
name = group.name
# Configure each required attribute
group.set(**tc['cgroups']['conf'][kind][name])
def _target_configure(self, tc):
self._print_header(
'configuring target for [{}] experiments'\
.format(tc['tag']))
self._setup_kernel(tc)
self._setup_sched_features(tc)
self._setup_cpufreq(tc)
return self._setup_cgroups(tc)
def _target_conf_flag(self, tc, flag):
if 'flags' not in tc:
has_flag = False
else:
has_flag = flag in tc['flags']
self._log.debug('Check if target configuration [%s] has flag [%s]: %s',
tc['tag'], flag, has_flag)
return has_flag
def _target_cleanup(self, tc):
if self._old_selinux_mode is not None:
self._log.info('Restoring target SELinux mode: %s',
self._old_selinux_mode)
self.target.execute('setenforce ' + self._old_selinux_mode,
as_root=True)
################################################################################
# Workload Setup and Execution
################################################################################
def _wload_cpus(self, wl_idx, wlspec):
if not 'cpus' in wlspec['conf']:
return None
cpus = wlspec['conf']['cpus']
if type(cpus) == list:
return cpus
if type(cpus) == int:
return [cpus]
# SMP target (or not bL module loaded)
if not hasattr(self.target, 'bl'):
if 'first' in cpus:
return [ self.target.list_online_cpus()[0] ]
if 'last' in cpus:
return [ self.target.list_online_cpus()[-1] ]
return self.target.list_online_cpus()
# big.LITTLE target
if cpus.startswith('littles'):
if 'first' in cpus:
return [ self.target.bl.littles_online[0] ]
if 'last' in cpus:
return [ self.target.bl.littles_online[-1] ]
return self.target.bl.littles_online
if cpus.startswith('bigs'):
if 'first' in cpus:
return [ self.target.bl.bigs_online[0] ]
if 'last' in cpus:
return [ self.target.bl.bigs_online[-1] ]
return self.target.bl.bigs_online
raise ValueError('unsupported [{}] "cpus" value for [{}] '
'workload specification'
.format(cpus, wl_idx))
def _wload_task_idxs(self, wl_idx, tasks):
if type(tasks) == int:
return range(tasks)
if tasks == 'cpus':
return range(len(self.target.core_names))
if tasks == 'little':
return range(len([t
for t in self.target.core_names
if t == self.target.little_core]))
if tasks == 'big':
return range(len([t
for t in self.target.core_names
if t == self.target.big_core]))
raise ValueError('unsupported "tasks" value for [{}] RT-App '
'workload specification'
.format(wl_idx))
def _wload_rtapp(self, wl_idx, wlspec, cpus):
conf = wlspec['conf']
self._log.debug('Configuring [%s] rt-app...', conf['class'])
# Setup a default "empty" task name prefix
if 'prefix' not in conf:
conf['prefix'] = 'task_'
# Setup a default loadref CPU
loadref = None
if 'loadref' in wlspec:
loadref = wlspec['loadref']
if conf['class'] == 'profile':
params = {}
# Load each task specification
for task_name, task in conf['params'].items():
if task['kind'] not in wlgen.__dict__:
self._log.error('RTA task of kind [%s] not supported',
task['kind'])
raise ValueError('unsupported "kind" value for task [{}] '
'in RT-App workload specification'
.format(task))
task_ctor = getattr(wlgen, task['kind'])
num_tasks = task.get('tasks', 1)
task_idxs = self._wload_task_idxs(wl_idx, num_tasks)
for idx in task_idxs:
idx_name = str(idx) if len(task_idxs) > 0 else ""
task_name_idx = conf['prefix'] + task_name + idx_name
params[task_name_idx] = task_ctor(**task['params']).get()
rtapp = wlgen.RTA(self.target,
wl_idx, calibration = self.te.calibration())
rtapp.conf(kind='profile', params=params, loadref=loadref,
cpus=cpus, run_dir=self.te.run_dir)
return rtapp
if conf['class'] == 'periodic':
task_idxs = self._wload_task_idxs(wl_idx, conf['tasks'])
params = {}
for idx in task_idxs:
task = conf['prefix'] + str(idx)
params[task] = wlgen.Periodic(**conf['params']).get()
rtapp = wlgen.RTA(self.target,
wl_idx, calibration = self.te.calibration())
rtapp.conf(kind='profile', params=params, loadref=loadref,
cpus=cpus, run_dir=self.te.run_dir)
return rtapp
if conf['class'] == 'custom':
rtapp = wlgen.RTA(self.target,
wl_idx, calibration = self.te.calib)
rtapp.conf(kind='custom',
params=conf['json'],
duration=conf['duration'],
loadref=loadref,
cpus=cpus, run_dir=self.te.run_dir)
return rtapp
raise ValueError('unsupported \'class\' value for [{}] '
'RT-App workload specification'
.format(wl_idx))
def _wload_perf_bench(self, wl_idx, wlspec, cpus):
conf = wlspec['conf']
self._log.debug('Configuring perf_message...')
if conf['class'] == 'messaging':
perf_bench = wlgen.PerfMessaging(self.target, wl_idx)
perf_bench.conf(**conf['params'])
return perf_bench
if conf['class'] == 'pipe':
perf_bench = wlgen.PerfPipe(self.target, wl_idx)
perf_bench.conf(**conf['params'])
return perf_bench
raise ValueError('unsupported "class" value for [{}] '
'perf bench workload specification'
.format(wl_idx))
def _wload_conf(self, wl_idx, wlspec):
# CPUS: setup execution on CPUs if required by configuration
cpus = self._wload_cpus(wl_idx, wlspec)
# CGroup: setup CGroups if requried by configuration
self._cgroup = self._default_cgroup
if 'cgroup' in wlspec:
if 'cgroups' not in self.target.modules:
raise RuntimeError('Target not supporting CGroups or CGroups '
'not configured for the current test configuration')
self._cgroup = wlspec['cgroup']
if wlspec['type'] == 'rt-app':
return self._wload_rtapp(wl_idx, wlspec, cpus)
if wlspec['type'] == 'perf_bench':
return self._wload_perf_bench(wl_idx, wlspec, cpus)
raise ValueError('unsupported "type" value for [{}] '
'workload specification'
.format(wl_idx))
def _wload_init(self, tc, wl_idx):
tc_idx = tc['tag']
# Configure the test workload
wlspec = self._experiments_conf['wloads'][wl_idx]
wload = self._wload_conf(wl_idx, wlspec)
# Keep track of platform configuration
test_dir = '{}/{}:{}:{}'\
.format(self.te.res_dir, wload.wtype, tc_idx, wl_idx)
os.makedirs(test_dir)
self.te.platform_dump(test_dir)
# Keep track of kernel configuration and version
config = self.target.config
with gzip.open(os.path.join(test_dir, 'kernel.config'), 'wb') as fh:
fh.write(config.text)
output = self.target.execute('{} uname -a'\
.format(self.target.busybox))
with open(os.path.join(test_dir, 'kernel.version'), 'w') as fh:
fh.write(output)
return wload, test_dir
def _wload_run(self, exp_idx, experiment):
tc = experiment.conf
wload = experiment.wload
tc_idx = tc['tag']
self._print_title('Experiment {}/{}, [{}:{}] {}/{}'\
.format(exp_idx, self._exp_count,
tc_idx, experiment.wload_name,
experiment.iteration, self._iterations))
# Setup local results folder
self._log.debug('out_dir set to [%s]', experiment.out_dir)
os.system('mkdir -p ' + experiment.out_dir)
# Freeze all userspace tasks that we don't need for running tests
need_thaw = False
if self._target_conf_flag(tc, 'freeze_userspace'):
need_thaw = self._freeze_userspace()
# FTRACE: start (if a configuration has been provided)
if self.te.ftrace and self._target_conf_flag(tc, 'ftrace'):
self._log.warning('FTrace events collection enabled')
self.te.ftrace.start()
# ENERGY: start sampling
if self.te.emeter:
self.te.emeter.reset()
# WORKLOAD: Run the configured workload
wload.run(out_dir=experiment.out_dir, cgroup=self._cgroup)
# ENERGY: collect measurements
if self.te.emeter:
self.te.emeter.report(experiment.out_dir)
# FTRACE: stop and collect measurements
if self.te.ftrace and self._target_conf_flag(tc, 'ftrace'):
self.te.ftrace.stop()
trace_file = experiment.out_dir + '/trace.dat'
self.te.ftrace.get_trace(trace_file)
self._log.info('Collected FTrace binary trace:')
self._log.info(' %s',
trace_file.replace(self.te.res_dir, '<res_dir>'))
stats_file = experiment.out_dir + '/trace_stat.json'
self.te.ftrace.get_stats(stats_file)
self._log.info('Collected FTrace function profiling:')
self._log.info(' %s',
stats_file.replace(self.te.res_dir, '<res_dir>'))
# Unfreeze the tasks we froze
if need_thaw:
self._thaw_userspace()
self._print_footer()
def _freeze_userspace(self):
if 'cgroups' not in self.target.modules:
raise RuntimeError(
'Failed to freeze userspace. Ensure "cgroups" module is listed '
'among modules in target/test configuration')
controllers = [s.name for s in self.target.cgroups.list_subsystems()]
if 'freezer' not in controllers:
self._log.warning('No freezer cgroup controller on target. '
'Not freezing userspace')
return False
exclude = self.critical_tasks[self.te.target.os]
self._log.info('Freezing all tasks except: %s', ','.join(exclude))
self.te.target.cgroups.freeze(exclude)
return True
def _thaw_userspace(self):
self._log.info('Un-freezing userspace tasks')
self.te.target.cgroups.freeze(thaw=True)
################################################################################
# Utility Functions
################################################################################
def _print_section(self, message):
self._log.info('')
self._log.info(FMT_SECTION)
self._log.info(message)
self._log.info(FMT_SECTION)
def _print_header(self, message):
self._log.info('')
self._log.info(FMT_HEADER)
self._log.info(message)
def _print_title(self, message):
self._log.info(FMT_TITLE)
self._log.info(message)
def _print_footer(self, message=None):
if message:
self._log.info(message)
self._log.info(FMT_FOOTER)
################################################################################
# Globals
################################################################################
# Regular expression for comments
JSON_COMMENTS_RE = re.compile(
'(^)?[^\S\n]*/(?:\*(.*?)\*/[^\S\n]*|/[^\n]*)($)?',
re.DOTALL | re.MULTILINE
)
# Target specific paths
TGT_RUN_DIR = 'run_dir'
# Logging formatters
FMT_SECTION = r'{:#<80}'.format('')
FMT_HEADER = r'{:=<80}'.format('')
FMT_TITLE = r'{:~<80}'.format('')
FMT_FOOTER = r'{:-<80}'.format('')
# vim :set tabstop=4 shiftwidth=4 expandtab