blob: f7d2b929c74ce12e2eb23dbb0517c1f40d25f30f [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import atexit
7import errno
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Simran Basi7498d202012-07-10 15:21:28 -070010import re
Scott Zawalski201d6be2012-09-21 15:56:25 -040011import sys
Simran Basi7498d202012-07-10 15:21:28 -070012import socket
13import threading
Simran Basi7498d202012-07-10 15:21:28 -070014import xmlrpclib
15
Fang Deng71c4b1f2013-05-20 09:55:04 -070016import rpm_controller
17import rpm_logging_config
Dan Shib7610b52014-05-06 11:09:49 -070018
Simran Basi7498d202012-07-10 15:21:28 -070019from config import rpm_config
20from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
Simran Basi7498d202012-07-10 15:21:28 -070021from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070022
Dan Shib7610b52014-05-06 11:09:49 -070023import common
24from autotest_lib.site_utils.rpm_control_system import utils
25
Simran Basi7498d202012-07-10 15:21:28 -070026LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
27
Fang Deng71c4b1f2013-05-20 09:55:04 -070028# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070029MAPPING_FILE = os.path.join(
30 os.path.dirname(__file__),
31 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070032
Simran Basi7498d202012-07-10 15:21:28 -070033
34class RPMDispatcher(object):
35 """
36 This class is the RPM dispatcher server and it is responsible for
37 communicating directly to the RPM devices to change a DUT's outlet status.
38
39 When an RPMDispatcher is initialized it registers itself with the frontend
40 server, who will field out outlet requests to this dispatcher.
41
42 Once a request is received the dispatcher looks up the RPMController
43 instance for the given DUT and then queues up the request and blocks until
44 it is processed.
45
46 @var _address: IP address or Hostname of this dispatcher server.
47 @var _frontend_server: URI of the frontend server.
48 @var _lock: Lock used to synchronize access to _worker_dict.
49 @var _port: Port assigned to this server instance.
50 @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
51 instances.
52 """
53
54
55 def __init__(self, address, port):
56 """
57 RPMDispatcher constructor.
58
59 Initialized instance vars and registers this server with the frontend
60 server.
61
62 @param address: Address of this dispatcher server.
63 @param port: Port assigned to this dispatcher server.
64
65 @raise RPMInfrastructureException: Raised if the dispatch server is
66 unable to register with the frontend
67 server.
68 """
69 self._address = address
70 self._port = port
71 self._lock = threading.Lock()
72 self._worker_dict = {}
73 self._frontend_server = rpm_config.get('RPM_INFRASTRUCTURE',
74 'frontend_uri')
Fang Deng71c4b1f2013-05-20 09:55:04 -070075 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
76 self._servo_interface = utils.load_servo_interface_mapping()
Simran Basi7498d202012-07-10 15:21:28 -070077 logging.info('Registering this rpm dispatcher with the frontend '
78 'server at %s.', self._frontend_server)
79 client = xmlrpclib.ServerProxy(self._frontend_server)
80 # De-register with the frontend when the dispatcher exit's.
81 atexit.register(self._unregister)
82 try:
83 client.register_dispatcher(self._get_serveruri())
84 except socket.error as er:
85 err_msg = ('Unable to register with frontend server. Error: %s.' %
86 errno.errorcode[er.errno])
87 logging.error(err_msg)
88 raise RPMInfrastructureException(err_msg)
89
90
91 def _worker_dict_put(self, key, value):
92 """
93 Private method used to synchronize access to _worker_dict.
94
95 @param key: key value we are using to access _worker_dict.
96 @param value: value we are putting into _worker_dict.
97 """
98 with self._lock:
99 self._worker_dict[key] = value
100
101
102 def _worker_dict_get(self, key):
103 """
104 Private method used to synchronize access to _worker_dict.
105
106 @param key: key value we are using to access _worker_dict.
107 @return: value found when accessing _worker_dict
108 """
109 with self._lock:
110 return self._worker_dict.get(key)
111
112
113 def is_up(self):
114 """
115 Allows the frontend server to see if the dispatcher server is up before
116 attempting to queue requests.
117
118 @return: True. If connection fails, the client proxy will throw a socket
119 error on the client side.
120 """
121 return True
122
123
124 def queue_request(self, dut_hostname, new_state):
125 """
126 Looks up the appropriate RPMController instance for this DUT and queues
127 up the request.
128
129 @param dut_hostname: hostname of the DUT whose outlet we are trying to
130 change.
131 @param new_state: [ON, OFF, CYCLE] state we want to the change the
132 outlet to.
133 @return: True if the attempt to change power state was successful,
134 False otherwise.
135 """
136 logging.info('Received request to set DUT: %s to state: %s',
137 dut_hostname, new_state)
Simran Basic2896a42013-11-05 17:31:02 -0800138 rpm_hostname = self._get_rpm_hostname_for_dut(dut_hostname)
139 result = False
140 while not result and rpm_hostname:
141 rpm_controller = self._get_rpm_controller(rpm_hostname)
142 result = rpm_controller.queue_request(dut_hostname, new_state)
143 if not result:
144 # If the request failed, check to see if there is another RPM
145 # at this location.
146 rpm_hostname = rpm_controller.get_next_rpm_hostname()
147 return result
Simran Basi7498d202012-07-10 15:21:28 -0700148
149
Simran Basic2896a42013-11-05 17:31:02 -0800150 def _get_rpm_hostname_for_dut(self, dut_hostname):
Simran Basi7498d202012-07-10 15:21:28 -0700151 """
Simran Basic2896a42013-11-05 17:31:02 -0800152 Private method that retreives the appropriate RPMController instance
153 for this DUT.
Simran Basi7498d202012-07-10 15:21:28 -0700154
155 @param dut_hostname: hostname of the DUT whose RPMController we want.
156
Simran Basic2896a42013-11-05 17:31:02 -0800157 @return: RPM Hostname responsible for this DUT.
Fang Deng71c4b1f2013-05-20 09:55:04 -0700158 Return None on failure.
Simran Basi7498d202012-07-10 15:21:28 -0700159 """
Fang Deng71c4b1f2013-05-20 09:55:04 -0700160 if dut_hostname.endswith('servo'):
161 # Servos are managed by Cisco POE switches.
162 reload_info = utils.reload_servo_interface_mapping_if_necessary(
163 self._mapping_last_modified)
164 if reload_info:
165 self._mapping_last_modified, self._servo_interface = reload_info
166 switch_if_tuple = self._servo_interface.get(dut_hostname)
167 if not switch_if_tuple:
168 logging.error('Could not determine POE hostname for %s. '
169 'Please check the servo-interface mapping file.',
170 dut_hostname)
171 return None
172 else:
173 rpm_hostname = switch_if_tuple[0]
174 logging.info('POE hostname for DUT %s is %s', dut_hostname,
175 rpm_hostname)
176 else:
177 # Regular DUTs are managed by RPMs.
178 rpm_hostname = re.sub('host[^.]*', 'rpm1', dut_hostname, count=1)
179 logging.info('RPM hostname for DUT %s is %s', dut_hostname,
180 rpm_hostname)
Simran Basic2896a42013-11-05 17:31:02 -0800181 return rpm_hostname
182
183
184 def _get_rpm_controller(self, rpm_hostname):
185 """
186 Private method that retreives the appropriate RPMController instance
187 for this RPM Hostname or calls _create_rpm_controller it if it does not
188 already exist.
189
190 @param rpm_hostname: hostname of the RPM whose RPMController we want.
191
192 @return: RPMController instance responsible for this RPM.
193 """
194 if not rpm_hostname:
195 return None
Simran Basi7498d202012-07-10 15:21:28 -0700196 rpm_controller = self._worker_dict_get(rpm_hostname)
197 if not rpm_controller:
198 rpm_controller = self._create_rpm_controller(rpm_hostname)
199 self._worker_dict_put(rpm_hostname, rpm_controller)
200 return rpm_controller
201
202
203 def _create_rpm_controller(self, rpm_hostname):
204 """
205 Determines the type of RPMController required and initializes it.
206
207 @param rpm_hostname: Hostname of the RPM we need to communicate with.
208
209 @return: RPMController instance responsible for this RPM.
210 """
211 hostname_elements = rpm_hostname.split('-')
Fang Deng71c4b1f2013-05-20 09:55:04 -0700212 if hostname_elements[-2] == 'poe':
213 # POE switch hostname looks like 'chromeos2-poe-switch1'.
214 logging.info('The controller is a Cisco POE switch.')
215 return rpm_controller.CiscoPOEController(
216 rpm_hostname, self._servo_interface)
Simran Basi7498d202012-07-10 15:21:28 -0700217 else:
Fang Deng71c4b1f2013-05-20 09:55:04 -0700218 # The device is an RPM.
219 rack_id = hostname_elements[-2]
220 rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
221 if rpm_typechecker.match(rack_id):
222 logging.info('RPM is a webpowered device.')
223 return rpm_controller.WebPoweredRPMController(rpm_hostname)
224 else:
225 logging.info('RPM is a Sentry CDU device.')
226 return rpm_controller.SentryRPMController(rpm_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700227
228
229 def _get_serveruri(self):
230 """
231 Formats the _address and _port into a meaningful URI string.
232
233 @return: URI of this dispatch server.
234 """
235 return 'http://%s:%d' % (self._address, self._port)
236
237
238 def _unregister(self):
239 """
240 Tells the frontend server that this dispatch server is shutting down and
241 to unregister it.
242
243 Called by atexit.
244
245 @raise RPMInfrastructureException: Raised if the dispatch server is
246 unable to unregister with the
247 frontend server.
248 """
249 logging.info('Dispatch server shutting down. Unregistering with RPM '
250 'frontend server.')
251 client = xmlrpclib.ServerProxy(self._frontend_server)
252 try:
253 client.unregister_dispatcher(self._get_serveruri())
254 except socket.error as er:
255 err_msg = ('Unable to unregister with frontend server. Error: %s.' %
256 errno.errorcode[er.errno])
257 logging.error(err_msg)
258 raise RPMInfrastructureException(err_msg)
259
260
261def launch_server_on_unused_port():
262 """
263 Looks up an unused port on this host and launches the xmlrpc server.
264
265 Useful for testing by running multiple dispatch servers on the same host.
266
267 @return: server,port - server object and the port that which it is listening
268 to.
269 """
270 address = socket.gethostbyname(socket.gethostname())
271 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
272 # Set this socket to allow reuse.
273 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
274 sock.bind(('', 0))
275 port = sock.getsockname()[1]
276 server = MultiThreadedXMLRPCServer((address, port),
277 allow_none=True)
278 sock.close()
279 return server, port
280
281
282if __name__ == '__main__':
283 """
284 Main function used to launch the dispatch server. Creates an instance of
285 RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
286 """
Scott Zawalski201d6be2012-09-21 15:56:25 -0400287 if len(sys.argv) > 1:
288 print 'Usage: ./%s, no arguments available.' % sys.argv[0]
289 sys.exit(1)
Dan Shi1a34c362014-04-11 16:37:04 -0700290 rpm_logging_config.start_log_server(LOG_FILENAME_FORMAT)
291 rpm_logging_config.set_up_logging(log_filename_format=LOG_FILENAME_FORMAT,
292 use_log_server=True)
293
Simran Basi7498d202012-07-10 15:21:28 -0700294 # Get the local ip _address and set the server to utilize it.
295 address = socket.gethostbyname(socket.gethostname())
296 server, port = launch_server_on_unused_port()
297 rpm_dispatcher = RPMDispatcher(address, port)
298 server.register_instance(rpm_dispatcher)
Scott Zawalski201d6be2012-09-21 15:56:25 -0400299 server.serve_forever()