| # Copyright 2015-2017 ARM Limited |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import unittest |
| from trappy.stats.Topology import Topology |
| from trappy.stats.Trigger import Trigger |
| from trappy.stats.Aggregator import MultiTriggerAggregator |
| |
| import collections |
| import trappy |
| from trappy.base import Base |
| import pandas as pd |
| from pandas.util.testing import assert_series_equal |
| |
| |
| class TestTopology(unittest.TestCase): |
| |
| def test_add_to_level(self): |
| """Test level creation""" |
| |
| level_groups = [[1, 2], [0, 3, 4, 5]] |
| level = "test_level" |
| topology = Topology() |
| topology.add_to_level(level, level_groups) |
| check_groups = topology.get_level(level) |
| |
| self.assertTrue(topology.has_level(level)) |
| self.assertEqual(level_groups, check_groups) |
| |
| def test_flatten(self): |
| """Test Topology: flatten""" |
| |
| level_groups = [[1, 2], [0, 3, 4, 5]] |
| level = "test_level" |
| topology = Topology() |
| topology.add_to_level(level, level_groups) |
| flattened = [0, 1, 2, 3, 4, 5] |
| |
| self.assertEqual(flattened, topology.flatten()) |
| |
| def test_cpu_topology_construction(self): |
| """Test CPU Topology Construction""" |
| |
| cluster_0 = [0, 3, 4, 5] |
| cluster_1 = [1, 2] |
| clusters = [cluster_0, cluster_1] |
| topology = Topology(clusters=clusters) |
| |
| # Check cluster level creation |
| cluster_groups = [[0, 3, 4, 5], [1, 2]] |
| self.assertTrue(topology.has_level("cluster")) |
| self.assertEqual(cluster_groups, topology.get_level("cluster")) |
| |
| # Check cpu level creation |
| cpu_groups = [[0], [1], [2], [3], [4], [5]] |
| self.assertTrue(topology.has_level("cpu")) |
| self.assertEqual(cpu_groups, topology.get_level("cpu")) |
| |
| # Check "all" level |
| all_groups = [[0, 1, 2, 3, 4, 5]] |
| self.assertEqual(all_groups, topology.get_level("all")) |
| |
| def test_level_span(self): |
| """TestTopology: level_span""" |
| |
| level_groups = [[1, 2], [0, 3, 4, 5]] |
| level = "test_level" |
| topology = Topology() |
| topology.add_to_level(level, level_groups) |
| |
| self.assertEqual(topology.level_span(level), 2) |
| |
| def test_group_index(self): |
| """TestTopology: get_index""" |
| |
| level_groups = [[1, 2], [0, 3, 4, 5]] |
| level = "test_level" |
| topology = Topology() |
| topology.add_to_level(level, level_groups) |
| |
| self.assertEqual(topology.get_index(level, [1, 2]), 0) |
| self.assertEqual(topology.get_index(level, [0, 3, 4, 5]), 1) |
| |
| class BaseTestStats(unittest.TestCase): |
| def setUp(self): |
| trace = trappy.BareTrace() |
| data = { |
| |
| "identifier": [ |
| 0, |
| 0, |
| 0, |
| 1, |
| 1, |
| 1, |
| ], |
| "result": [ |
| "fire", |
| "blank", |
| "fire", |
| "blank", |
| "fire", |
| "blank", |
| ], |
| } |
| |
| index = pd.Series([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], name="Time") |
| data_frame = pd.DataFrame(data, index=index) |
| trace.add_parsed_event("aim_and_fire", data_frame) |
| self._trace = trace |
| self.topology = Topology(clusters=[[0], [1]]) |
| |
| |
| class TestTrigger(BaseTestStats): |
| |
| def test_trigger_generation(self): |
| """TestTrigger: generate""" |
| |
| filters = { |
| "result": "fire" |
| } |
| |
| event_class = self._trace.aim_and_fire |
| value = 1 |
| pivot = "identifier" |
| |
| trigger = Trigger(self._trace, |
| event_class, |
| filters, |
| value, |
| pivot) |
| |
| expected = pd.Series([1, 1], index=pd.Index([0.1, 0.3], name="Time")) |
| assert_series_equal(expected, trigger.generate(0)) |
| |
| expected = pd.Series([1], index=pd.Index([0.5], name="Time")) |
| assert_series_equal(expected, trigger.generate(1)) |
| |
| def test_trigger_with_func(self): |
| """Trigger works with a function or lambda as filter""" |
| |
| def my_filter(val): |
| return val.startswith("fi") |
| |
| trigger = Trigger(self._trace, self._trace.aim_and_fire, |
| filters={"result": my_filter}, value=1, |
| pivot="identifier") |
| |
| expected = pd.Series([1], index=pd.Index([0.5], name="Time")) |
| assert_series_equal(expected, trigger.generate(1)) |
| |
| my_filters = {"result": lambda x: x.startswith("bl")} |
| trigger = Trigger(self._trace, self._trace.aim_and_fire, |
| filters=my_filters, value=1, pivot="identifier") |
| |
| expected = pd.Series([1, 1], index=pd.Index([0.4, 0.6], name="Time")) |
| assert_series_equal(expected, trigger.generate(1)) |
| |
| def test_trigger_with_callable_class(self): |
| """Trigger works with a callable class as filter""" |
| |
| class my_filter(object): |
| def __init__(self, val_out): |
| self.prev_val = 0 |
| self.val_out = val_out |
| |
| def __call__(self, val): |
| ret = self.prev_val == self.val_out |
| self.prev_val = val |
| |
| return ret |
| |
| trigger = Trigger(self._trace, self._trace.aim_and_fire, |
| filters={"identifier": my_filter(1)}, value=1, |
| pivot="result") |
| |
| expected = pd.Series([1], index=pd.Index([0.6], name="Time")) |
| assert_series_equal(expected, trigger.generate("blank")) |
| |
| def test_filter_prev_values(self): |
| """Trigger works with a filter that depends on previous values of the same pivot""" |
| |
| # We generate an example in which we want a trigger whenever the |
| # identifier is no longer 1 for blank |
| |
| class my_filter(object): |
| def __init__(self, val_out): |
| self.prev_val = 0 |
| self.val_out = val_out |
| |
| def __call__(self, val): |
| ret = self.prev_val == self.val_out |
| self.prev_val = val |
| |
| return ret |
| |
| trace = trappy.BareTrace() |
| data = collections.OrderedDict([ |
| (0.1, ["blank", 1]), |
| (0.2, ["fire", 1]), |
| (0.3, ["blank", 0]), # value is no longer 1, trigger |
| (0.4, ["blank", 1]), |
| (0.5, ["fire", 0]), # This should NOT trigger |
| (0.6, ["blank", 0]), # value is no longer 1 for blank, trigger |
| ]) |
| data_frame = pd.DataFrame.from_dict(data, orient="index", ) |
| data_frame.columns = ["result", "identifier"] |
| trace.add_parsed_event("aim_and_fire", data_frame) |
| |
| trigger = Trigger(trace, trace.aim_and_fire, |
| filters={"identifier": my_filter(1)}, value=-1, |
| pivot="result") |
| |
| expected = pd.Series([-1, -1], index=[0.3, 0.6]) |
| assert_series_equal(expected, trigger.generate("blank")) |
| |
| |
| |
| class TestAggregator(BaseTestStats): |
| |
| def test_scalar_aggfunc_single_trigger(self): |
| """TestAggregator: 1 trigger scalar aggfunc""" |
| |
| def aggfunc(series): |
| return series.sum() |
| |
| filters = { |
| "result": "fire" |
| } |
| |
| event_class = self._trace.aim_and_fire |
| value = 1 |
| pivot = "identifier" |
| |
| trigger = Trigger(self._trace, |
| event_class, |
| filters, |
| value, |
| pivot) |
| |
| aggregator = MultiTriggerAggregator([trigger], |
| self.topology, |
| aggfunc=aggfunc) |
| |
| # There are three "fire" in total |
| # The all level in topology looks like |
| # [[0, 1]] |
| result = aggregator.aggregate(level="all") |
| self.assertEqual(result, [3.0]) |
| |
| # There are two "fire" on the first node group and a |
| # a single "fire" on the second node group at the cluster |
| # level which looks like |
| # [[0], [1]] |
| result = aggregator.aggregate(level="cluster") |
| self.assertEqual(result, [2.0, 1.0]) |
| |
| def test_vector_aggfunc_single_trigger(self): |
| """TestAggregator: 1 trigger vector aggfunc""" |
| |
| def aggfunc(series): |
| return series.cumsum() |
| |
| filters = { |
| "result": "fire" |
| } |
| |
| event_class = self._trace.aim_and_fire |
| value = 1 |
| pivot = "identifier" |
| |
| trigger = Trigger(self._trace, event_class, filters, value, pivot) |
| |
| aggregator = MultiTriggerAggregator([trigger], |
| self.topology, |
| aggfunc=aggfunc) |
| |
| # There are three "fire" in total |
| # The all level in topology looks like |
| # [[0, 1]] |
| result = aggregator.aggregate(level="all") |
| expected_result = pd.Series([1.0, 1.0, 2.0, 2.0, 3.0, 3.0], |
| index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6]) |
| ) |
| assert_series_equal(result[0], expected_result) |
| |
| def test_vector_aggfunc_multiple_trigger(self): |
| """TestAggregator: multi trigger vector aggfunc""" |
| |
| def aggfunc(series): |
| return series.cumsum() |
| |
| filters = { |
| "result": "fire" |
| } |
| |
| event_class = self._trace.aim_and_fire |
| value = 1 |
| pivot = "identifier" |
| |
| trigger_fire = Trigger(self._trace, |
| event_class, |
| filters, |
| value, |
| pivot) |
| |
| filters = { |
| "result": "blank" |
| } |
| value = -1 |
| trigger_blank = Trigger(self._trace, event_class, filters, value, |
| pivot) |
| |
| aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank], |
| self.topology, |
| aggfunc=aggfunc) |
| |
| # There are three "fire" in total |
| # The all level in topology looks like |
| # [[0, 1]] |
| result = aggregator.aggregate(level="all") |
| expected_result = pd.Series([1.0, 0.0, 1.0, 0.0, 1.0, 0.0], |
| index=pd.Index([0.1, 0.2, 0.3, 0.4, 0.5, 0.6]) |
| ) |
| assert_series_equal(result[0], expected_result) |
| |
| def test_default_aggfunc_multiple_trigger(self): |
| """MultiTriggerAggregator with the default aggfunc""" |
| |
| trigger_fire = Trigger(self._trace, self._trace.aim_and_fire, |
| filters={"result": "fire"}, |
| pivot="identifier", value=1) |
| |
| trigger_blank = Trigger(self._trace, self._trace.aim_and_fire, |
| filters={"result": "blank"}, |
| pivot="identifier", value=2) |
| |
| aggregator = MultiTriggerAggregator([trigger_fire, trigger_blank], |
| self.topology) |
| |
| results = aggregator.aggregate(level="cpu") |
| expected_results = [ |
| pd.Series([1., 2., 1., 0., 0., 0.], |
| index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]), |
| pd.Series([0., 0., 0., 2., 1., 2.], |
| index=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]), |
| ] |
| |
| self.assertEquals(len(results), len(expected_results)) |
| for result, expected in zip(results, expected_results): |
| assert_series_equal(result, expected) |