blob: e2735b4b1b082495f352ee035ce581fe1bc53026 [file] [log] [blame]
Alfie Chen8b8dda42020-11-06 12:39:15 +08001#!/usr/bin/env python3
2#
3# Copyright 2020 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17import time
18from acts import logger
19from acts import utils
20
21
22def create(configs):
23 """Factory method for PDU.
24
25 Args:
26 configs: list of dicts with pdu settings. settings must contain the
27 following type (string denoting type of pdu)
28 """
29 objs = []
30 for config in configs:
31 try:
32 pdu_class = globals()[config['device']]
33 except KeyError:
34 raise KeyError('Invalid pdu configuration.')
35 objs.append(pdu_class(config))
36 return objs
37
38
39def destroy(objs):
40 return
41
42
43class Pdu(object):
44 """Base class implementation for PDU.
45
46 Base class provides functions whose implementation is shared by all
47 chambers.
48 """
49
50 def on_all(self):
51 """Turn on all outlets."""
52 raise NotImplementedError("Base class: cannot be called directly")
53
54 def off_all(self):
55 """Turn off all outlets."""
56 raise NotImplementedError("Base class: cannot be called directly")
57
58 def _set_status(self, action, status):
59 """Set outlets to on or off."""
60 raise NotImplementedError("Base class: cannot be called directly")
61
62 def get_status(self):
63 """Get outlets status."""
64 raise NotImplementedError("Base class: cannot be called directly")
65
66 def turn_on_outlets(self, outlets):
67 """Turn on specific outlets."""
68 raise NotImplementedError("Base class: cannot be called directly")
69
70 def turn_off_outlets(self, outlets):
71 """Turn off specific outlets."""
72 raise NotImplementedError("Base class: cannot be called directly")
73
74
75class PanioPs1158(Pdu):
76 def __init__(self, config):
77 self.config = config.copy()
78 self.device_id = self.config['device_id']
79 self.log = logger.create_tagged_trace_logger('pdu_ps1158[{}]'.format(
80 self.device_id))
81
82 def on_all(self):
83 """Turn on all outlets"""
84 self._set_status("on", '11111111')
85
86 def off_all(self):
87 """Turn off all outlets"""
88 self._set_status("off", "11111111")
89
90 def _set_status(self, action, status):
91 """Set outlets to on or off.
92
93 Args:
94 action: "on" or "off"
95 status: 8 bits of 0 or 1. e.g., "11111111"
96 """
97 cmd = "curl http://{}:{}@{}/{}s.cgi?led={}".format(self.config['username'],
98 self.config['password'],
99 self.config['host'],
100 action,
101 status)
102 self.log.info("PDU cmd: {}".format(cmd))
103 utils.start_standing_subprocess(cmd)
104 time.sleep(10)
105
106 def get_status(self):
107 """Get outlets status
108
109 Returns:
110 A tuple of (outlets_list, outlets_str)
111 outlets_list:
112 A List indicates the status of the outlets.
113 e.g., outlet 1 is ON, returns:
114 ['1', '0', '0', '0', '0', '0', '0', '0',]
115 e.g., outlets 1 & 8 are ON, returns:
116 ['1', '0', '0', '0', '0', '0', '0', '1']
117
118 outlets_str:
119 A string indicates the status of the outlets.
120 e.g., outlet 1 is ON:
121 returns: '1'
122 e.g., outlet 1 & 3 $ 8 are ON:
123 returns: '138'
124 """
125 outlets_str = ""
126 cmd = "curl http://{}:{}@{}/status.xml".format(self.config['username'],
127 self.config['password'],
128 self.config['host'])
129 proc = utils.start_standing_subprocess(cmd)
130 time.sleep(1)
131 try:
132 outlets_list = proc.communicate()[0].decode().split(",")[10:18]
133
134 """Translate a list of strings to a sequence of strings.
135 e.g.
136 ['1', '0', '0', '0', '0', '0', '0', '0',] turns into '1'
137 ['1', '1', '1', '1', '1', '1', '1', '1'] turns into '12345678'
138 """
139 for i in range(len(outlets_list)):
140 if outlets_list[i] == '1':
141 outlets_str = outlets_str + str(i + 1)
142 except:
143 raise KeyError("Fail to get status from PDU.")
144
145 return outlets_list, outlets_str
146
147 def turn_on_outlets(self, outlets):
148 """Turn specific outlets on
149
150 Args:
151 outlets: A string of outlet numbers.
152 e.g., '1' means outlets status will be: '10000000'
153 e.g., '378' means outlets status will be: '00100011'
154 """
155 self.off_all()
156 expect_outlets = ["1" if str(i) in outlets else "0" for i in range(1, 9)]
157 self._set_status("on", "".join(expect_outlets))
158
159 # Check if outlets are on as expected.
160 actual_outlets, _ = self.get_status()
161 self.log.info("Expect outlets : {}".format(expect_outlets))
162 self.log.info("Actual outlets : {}".format(actual_outlets))
163 if expect_outlets == actual_outlets:
164 self.log.info("Outlets are ON as expected")
165 else:
166 self.log.error("Outlets are not correctly turn on")
167
168 def turn_off_outlets(self, outlets):
169 """Turn specific outlets off
170
171 Args:
172 outlets: A string of outlet numbers.
173 e.g., '1' means outlets status will be: '01111111'
174 e.g., '378' means outlets status will be: '11011100'
175
176 """
177 self.on_all()
178 expect_outlets = ["1" if str(i) in outlets else "0" for i in range(1, 9)]
179 self._set_status("off", "".join(expect_outlets))
180
181 # Check if outlets are on as expected.
182 actual_outlets, _ = self.get_status()
183 temp_list = []
184
185 """When Turn off outlets, Panio ps1158 use "1" to turn off a outlet
186 (e.g., curl http://{}@{}/offs.cgi?led=00000001 to turn off outlet 8,
187 but actual outlets status will be '11111110', so need to
188 Turn "1" into "0" and vice versa to match the actual outlets status.
189 """
190 for outlet in expect_outlets:
191 if outlet == '1':
192 outlet = '0'
193 temp_list.append(outlet)
194 elif outlet == '0':
195 outlet = '1'
196 temp_list.append(outlet)
197 expect_outlets = temp_list
198 self.log.info("Expect outlets : {}".format(expect_outlets))
199 self.log.info("Actual outlets : {}".format(actual_outlets))
200 if expect_outlets == actual_outlets:
201 self.log.info("Outlets are OFF as expected")
202 else:
203 self.log.error("Outlets are not correctly turn off")