blob: 22671a131c22cfa6fd5617e9a5d5da681b692163 [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import atexit
7import errno
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Simran Basi7498d202012-07-10 15:21:28 -070010import re
Scott Zawalski201d6be2012-09-21 15:56:25 -040011import sys
Simran Basi7498d202012-07-10 15:21:28 -070012import socket
13import threading
Simran Basi7498d202012-07-10 15:21:28 -070014import xmlrpclib
15
Fang Deng71c4b1f2013-05-20 09:55:04 -070016import rpm_controller
17import rpm_logging_config
18import utils
Simran Basi7498d202012-07-10 15:21:28 -070019from config import rpm_config
20from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
Simran Basi7498d202012-07-10 15:21:28 -070021from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070022
23LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
24
Fang Deng71c4b1f2013-05-20 09:55:04 -070025# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070026MAPPING_FILE = os.path.join(
27 os.path.dirname(__file__),
28 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070029
Simran Basi7498d202012-07-10 15:21:28 -070030
31class RPMDispatcher(object):
32 """
33 This class is the RPM dispatcher server and it is responsible for
34 communicating directly to the RPM devices to change a DUT's outlet status.
35
36 When an RPMDispatcher is initialized it registers itself with the frontend
37 server, who will field out outlet requests to this dispatcher.
38
39 Once a request is received the dispatcher looks up the RPMController
40 instance for the given DUT and then queues up the request and blocks until
41 it is processed.
42
43 @var _address: IP address or Hostname of this dispatcher server.
44 @var _frontend_server: URI of the frontend server.
45 @var _lock: Lock used to synchronize access to _worker_dict.
46 @var _port: Port assigned to this server instance.
47 @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
48 instances.
49 """
50
51
52 def __init__(self, address, port):
53 """
54 RPMDispatcher constructor.
55
56 Initialized instance vars and registers this server with the frontend
57 server.
58
59 @param address: Address of this dispatcher server.
60 @param port: Port assigned to this dispatcher server.
61
62 @raise RPMInfrastructureException: Raised if the dispatch server is
63 unable to register with the frontend
64 server.
65 """
66 self._address = address
67 self._port = port
68 self._lock = threading.Lock()
69 self._worker_dict = {}
70 self._frontend_server = rpm_config.get('RPM_INFRASTRUCTURE',
71 'frontend_uri')
Fang Deng71c4b1f2013-05-20 09:55:04 -070072 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
73 self._servo_interface = utils.load_servo_interface_mapping()
Simran Basi7498d202012-07-10 15:21:28 -070074 logging.info('Registering this rpm dispatcher with the frontend '
75 'server at %s.', self._frontend_server)
76 client = xmlrpclib.ServerProxy(self._frontend_server)
77 # De-register with the frontend when the dispatcher exit's.
78 atexit.register(self._unregister)
79 try:
80 client.register_dispatcher(self._get_serveruri())
81 except socket.error as er:
82 err_msg = ('Unable to register with frontend server. Error: %s.' %
83 errno.errorcode[er.errno])
84 logging.error(err_msg)
85 raise RPMInfrastructureException(err_msg)
86
87
88 def _worker_dict_put(self, key, value):
89 """
90 Private method used to synchronize access to _worker_dict.
91
92 @param key: key value we are using to access _worker_dict.
93 @param value: value we are putting into _worker_dict.
94 """
95 with self._lock:
96 self._worker_dict[key] = value
97
98
99 def _worker_dict_get(self, key):
100 """
101 Private method used to synchronize access to _worker_dict.
102
103 @param key: key value we are using to access _worker_dict.
104 @return: value found when accessing _worker_dict
105 """
106 with self._lock:
107 return self._worker_dict.get(key)
108
109
110 def is_up(self):
111 """
112 Allows the frontend server to see if the dispatcher server is up before
113 attempting to queue requests.
114
115 @return: True. If connection fails, the client proxy will throw a socket
116 error on the client side.
117 """
118 return True
119
120
121 def queue_request(self, dut_hostname, new_state):
122 """
123 Looks up the appropriate RPMController instance for this DUT and queues
124 up the request.
125
126 @param dut_hostname: hostname of the DUT whose outlet we are trying to
127 change.
128 @param new_state: [ON, OFF, CYCLE] state we want to the change the
129 outlet to.
130 @return: True if the attempt to change power state was successful,
131 False otherwise.
132 """
133 logging.info('Received request to set DUT: %s to state: %s',
134 dut_hostname, new_state)
Simran Basic2896a42013-11-05 17:31:02 -0800135 rpm_hostname = self._get_rpm_hostname_for_dut(dut_hostname)
136 result = False
137 while not result and rpm_hostname:
138 rpm_controller = self._get_rpm_controller(rpm_hostname)
139 result = rpm_controller.queue_request(dut_hostname, new_state)
140 if not result:
141 # If the request failed, check to see if there is another RPM
142 # at this location.
143 rpm_hostname = rpm_controller.get_next_rpm_hostname()
144 return result
Simran Basi7498d202012-07-10 15:21:28 -0700145
146
Simran Basic2896a42013-11-05 17:31:02 -0800147 def _get_rpm_hostname_for_dut(self, dut_hostname):
Simran Basi7498d202012-07-10 15:21:28 -0700148 """
Simran Basic2896a42013-11-05 17:31:02 -0800149 Private method that retreives the appropriate RPMController instance
150 for this DUT.
Simran Basi7498d202012-07-10 15:21:28 -0700151
152 @param dut_hostname: hostname of the DUT whose RPMController we want.
153
Simran Basic2896a42013-11-05 17:31:02 -0800154 @return: RPM Hostname responsible for this DUT.
Fang Deng71c4b1f2013-05-20 09:55:04 -0700155 Return None on failure.
Simran Basi7498d202012-07-10 15:21:28 -0700156 """
Fang Deng71c4b1f2013-05-20 09:55:04 -0700157 if dut_hostname.endswith('servo'):
158 # Servos are managed by Cisco POE switches.
159 reload_info = utils.reload_servo_interface_mapping_if_necessary(
160 self._mapping_last_modified)
161 if reload_info:
162 self._mapping_last_modified, self._servo_interface = reload_info
163 switch_if_tuple = self._servo_interface.get(dut_hostname)
164 if not switch_if_tuple:
165 logging.error('Could not determine POE hostname for %s. '
166 'Please check the servo-interface mapping file.',
167 dut_hostname)
168 return None
169 else:
170 rpm_hostname = switch_if_tuple[0]
171 logging.info('POE hostname for DUT %s is %s', dut_hostname,
172 rpm_hostname)
173 else:
174 # Regular DUTs are managed by RPMs.
175 rpm_hostname = re.sub('host[^.]*', 'rpm1', dut_hostname, count=1)
176 logging.info('RPM hostname for DUT %s is %s', dut_hostname,
177 rpm_hostname)
Simran Basic2896a42013-11-05 17:31:02 -0800178 return rpm_hostname
179
180
181 def _get_rpm_controller(self, rpm_hostname):
182 """
183 Private method that retreives the appropriate RPMController instance
184 for this RPM Hostname or calls _create_rpm_controller it if it does not
185 already exist.
186
187 @param rpm_hostname: hostname of the RPM whose RPMController we want.
188
189 @return: RPMController instance responsible for this RPM.
190 """
191 if not rpm_hostname:
192 return None
Simran Basi7498d202012-07-10 15:21:28 -0700193 rpm_controller = self._worker_dict_get(rpm_hostname)
194 if not rpm_controller:
195 rpm_controller = self._create_rpm_controller(rpm_hostname)
196 self._worker_dict_put(rpm_hostname, rpm_controller)
197 return rpm_controller
198
199
200 def _create_rpm_controller(self, rpm_hostname):
201 """
202 Determines the type of RPMController required and initializes it.
203
204 @param rpm_hostname: Hostname of the RPM we need to communicate with.
205
206 @return: RPMController instance responsible for this RPM.
207 """
208 hostname_elements = rpm_hostname.split('-')
Fang Deng71c4b1f2013-05-20 09:55:04 -0700209 if hostname_elements[-2] == 'poe':
210 # POE switch hostname looks like 'chromeos2-poe-switch1'.
211 logging.info('The controller is a Cisco POE switch.')
212 return rpm_controller.CiscoPOEController(
213 rpm_hostname, self._servo_interface)
Simran Basi7498d202012-07-10 15:21:28 -0700214 else:
Fang Deng71c4b1f2013-05-20 09:55:04 -0700215 # The device is an RPM.
216 rack_id = hostname_elements[-2]
217 rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
218 if rpm_typechecker.match(rack_id):
219 logging.info('RPM is a webpowered device.')
220 return rpm_controller.WebPoweredRPMController(rpm_hostname)
221 else:
222 logging.info('RPM is a Sentry CDU device.')
223 return rpm_controller.SentryRPMController(rpm_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700224
225
226 def _get_serveruri(self):
227 """
228 Formats the _address and _port into a meaningful URI string.
229
230 @return: URI of this dispatch server.
231 """
232 return 'http://%s:%d' % (self._address, self._port)
233
234
235 def _unregister(self):
236 """
237 Tells the frontend server that this dispatch server is shutting down and
238 to unregister it.
239
240 Called by atexit.
241
242 @raise RPMInfrastructureException: Raised if the dispatch server is
243 unable to unregister with the
244 frontend server.
245 """
246 logging.info('Dispatch server shutting down. Unregistering with RPM '
247 'frontend server.')
248 client = xmlrpclib.ServerProxy(self._frontend_server)
249 try:
250 client.unregister_dispatcher(self._get_serveruri())
251 except socket.error as er:
252 err_msg = ('Unable to unregister with frontend server. Error: %s.' %
253 errno.errorcode[er.errno])
254 logging.error(err_msg)
255 raise RPMInfrastructureException(err_msg)
256
257
258def launch_server_on_unused_port():
259 """
260 Looks up an unused port on this host and launches the xmlrpc server.
261
262 Useful for testing by running multiple dispatch servers on the same host.
263
264 @return: server,port - server object and the port that which it is listening
265 to.
266 """
267 address = socket.gethostbyname(socket.gethostname())
268 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
269 # Set this socket to allow reuse.
270 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
271 sock.bind(('', 0))
272 port = sock.getsockname()[1]
273 server = MultiThreadedXMLRPCServer((address, port),
274 allow_none=True)
275 sock.close()
276 return server, port
277
278
279if __name__ == '__main__':
280 """
281 Main function used to launch the dispatch server. Creates an instance of
282 RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
283 """
Scott Zawalski201d6be2012-09-21 15:56:25 -0400284 if len(sys.argv) > 1:
285 print 'Usage: ./%s, no arguments available.' % sys.argv[0]
286 sys.exit(1)
Simran Basi7498d202012-07-10 15:21:28 -0700287 rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT)
288 # Get the local ip _address and set the server to utilize it.
289 address = socket.gethostbyname(socket.gethostname())
290 server, port = launch_server_on_unused_port()
291 rpm_dispatcher = RPMDispatcher(address, port)
292 server.register_instance(rpm_dispatcher)
Scott Zawalski201d6be2012-09-21 15:56:25 -0400293 server.serve_forever()