blob: 9d898c466c8099999434e8f0ed155b2551941e17 [file] [log] [blame]
# Copyright 2015 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import csv
import logging
import collections
from devlib.utils.types import numeric
# Channel modes describe what sort of measurement the instrument supports.
# Values must be powers of 2
INSTANTANEOUS = 1
CONTINUOUS = 2
class MeasurementType(tuple):
__slots__ = []
def __new__(cls, name, units, category=None):
return tuple.__new__(cls, (name, units, category))
@property
def name(self):
return tuple.__getitem__(self, 0)
@property
def units(self):
return tuple.__getitem__(self, 1)
@property
def category(self):
return tuple.__getitem__(self, 2)
def __getitem__(self, item):
raise TypeError()
def __cmp__(self, other):
if isinstance(other, MeasurementType):
other = other.name
return cmp(self.name, other)
def __str__(self):
return self.name
__repr__ = __str__
# Standard measures
_measurement_types = [
MeasurementType('time', 'seconds'),
MeasurementType('temperature', 'degrees'),
MeasurementType('power', 'watts', 'power/energy'),
MeasurementType('voltage', 'volts', 'power/energy'),
MeasurementType('current', 'amps', 'power/energy'),
MeasurementType('energy', 'joules', 'power/energy'),
MeasurementType('tx', 'bytes', 'data transfer'),
MeasurementType('rx', 'bytes', 'data transfer'),
MeasurementType('tx/rx', 'bytes', 'data transfer'),
]
MEASUREMENT_TYPES = {m.name: m for m in _measurement_types}
class Measurement(object):
__slots__ = ['value', 'channel']
@property
def name(self):
return '{}_{}'.format(self.channel.site, self.channel.kind)
@property
def units(self):
return self.channel.units
def __init__(self, value, channel):
self.value = value
self.channel = channel
def __cmp__(self, other):
if isinstance(other, Measurement):
return cmp(self.value, other.value)
else:
return cmp(self.value, other)
def __str__(self):
if self.units:
return '{}: {} {}'.format(self.name, self.value, self.units)
else:
return '{}: {}'.format(self.name, self.value)
__repr__ = __str__
class MeasurementsCsv(object):
def __init__(self, path, channels):
self.path = path
self.channels = channels
self._fh = open(path, 'rb')
def measurements(self):
return list(self.itermeasurements())
def itermeasurements(self):
self._fh.seek(0)
reader = csv.reader(self._fh)
reader.next() # headings
for row in reader:
values = map(numeric, row)
yield [Measurement(v, c) for (v, c) in zip(values, self.channels)]
class InstrumentChannel(object):
@property
def label(self):
return '{}_{}'.format(self.site, self.kind)
@property
def kind(self):
return self.measurement_type.name
@property
def units(self):
return self.measurement_type.units
def __init__(self, name, site, measurement_type, **attrs):
self.name = name
self.site = site
if isinstance(measurement_type, MeasurementType):
self.measurement_type = measurement_type
else:
try:
self.measurement_type = MEASUREMENT_TYPES[measurement_type]
except KeyError:
raise ValueError('Unknown measurement type: {}'.format(measurement_type))
for atname, atvalue in attrs.iteritems():
setattr(self, atname, atvalue)
def __str__(self):
if self.name == self.label:
return 'CHAN({})'.format(self.label)
else:
return 'CHAN({}, {})'.format(self.name, self.label)
__repr__ = __str__
class Instrument(object):
mode = 0
def __init__(self, target):
self.target = target
self.logger = logging.getLogger(self.__class__.__name__)
self.channels = collections.OrderedDict()
self.active_channels = []
self.sample_rate_hz = None
# channel management
def list_channels(self):
return self.channels.values()
def get_channels(self, measure):
if hasattr(measure, 'name'):
measure = measure.name
return [c for c in self.list_channels() if c.kind == measure]
def add_channel(self, site, measure, name=None, **attrs):
if name is None:
name = '{}_{}'.format(site, measure)
chan = InstrumentChannel(name, site, measure, **attrs)
self.channels[chan.label] = chan
# initialization and teardown
def setup(self, *args, **kwargs):
pass
def teardown(self):
pass
def reset(self, sites=None, kinds=None, channels=None):
if kinds is None and sites is None and channels is None:
self.active_channels = sorted(self.channels.values(), key=lambda x: x.label)
else:
if isinstance(sites, basestring):
sites = [sites]
if isinstance(kinds, basestring):
kinds = [kinds]
self.active_channels = []
for chan_name in (channels or []):
try:
self.active_channels.append(self.channels[chan_name])
except KeyError:
msg = 'Unexpected channel "{}"; must be in {}'
raise ValueError(msg.format(chan_name, self.channels.keys()))
for chan in self.channels.values():
if (kinds is None or chan.kind in kinds) and \
(sites is None or chan.site in sites):
self.active_channels.append(chan)
# instantaneous
def take_measurement(self):
pass
# continuous
def start(self):
pass
def stop(self):
pass
def get_data(self, outfile):
pass