blob: be3a2f55ee3977e36152a6d5397754cd0a1b8e03 [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import atexit
7import errno
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Simran Basi7498d202012-07-10 15:21:28 -070010import re
Scott Zawalski201d6be2012-09-21 15:56:25 -040011import sys
Simran Basi7498d202012-07-10 15:21:28 -070012import socket
13import threading
Simran Basi7498d202012-07-10 15:21:28 -070014import xmlrpclib
15
Fang Deng71c4b1f2013-05-20 09:55:04 -070016import rpm_controller
17import rpm_logging_config
18import utils
Simran Basi7498d202012-07-10 15:21:28 -070019from config import rpm_config
20from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
Simran Basi7498d202012-07-10 15:21:28 -070021from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070022
23LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
24
Fang Deng71c4b1f2013-05-20 09:55:04 -070025# Servo-interface mapping file
26MAPPING_FILE = rpm_config.get('CiscoPOE', 'servo_interface_mapping_file')
27
Simran Basi7498d202012-07-10 15:21:28 -070028
29class RPMDispatcher(object):
30 """
31 This class is the RPM dispatcher server and it is responsible for
32 communicating directly to the RPM devices to change a DUT's outlet status.
33
34 When an RPMDispatcher is initialized it registers itself with the frontend
35 server, who will field out outlet requests to this dispatcher.
36
37 Once a request is received the dispatcher looks up the RPMController
38 instance for the given DUT and then queues up the request and blocks until
39 it is processed.
40
41 @var _address: IP address or Hostname of this dispatcher server.
42 @var _frontend_server: URI of the frontend server.
43 @var _lock: Lock used to synchronize access to _worker_dict.
44 @var _port: Port assigned to this server instance.
45 @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
46 instances.
47 """
48
49
50 def __init__(self, address, port):
51 """
52 RPMDispatcher constructor.
53
54 Initialized instance vars and registers this server with the frontend
55 server.
56
57 @param address: Address of this dispatcher server.
58 @param port: Port assigned to this dispatcher server.
59
60 @raise RPMInfrastructureException: Raised if the dispatch server is
61 unable to register with the frontend
62 server.
63 """
64 self._address = address
65 self._port = port
66 self._lock = threading.Lock()
67 self._worker_dict = {}
68 self._frontend_server = rpm_config.get('RPM_INFRASTRUCTURE',
69 'frontend_uri')
Fang Deng71c4b1f2013-05-20 09:55:04 -070070 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
71 self._servo_interface = utils.load_servo_interface_mapping()
Simran Basi7498d202012-07-10 15:21:28 -070072 logging.info('Registering this rpm dispatcher with the frontend '
73 'server at %s.', self._frontend_server)
74 client = xmlrpclib.ServerProxy(self._frontend_server)
75 # De-register with the frontend when the dispatcher exit's.
76 atexit.register(self._unregister)
77 try:
78 client.register_dispatcher(self._get_serveruri())
79 except socket.error as er:
80 err_msg = ('Unable to register with frontend server. Error: %s.' %
81 errno.errorcode[er.errno])
82 logging.error(err_msg)
83 raise RPMInfrastructureException(err_msg)
84
85
86 def _worker_dict_put(self, key, value):
87 """
88 Private method used to synchronize access to _worker_dict.
89
90 @param key: key value we are using to access _worker_dict.
91 @param value: value we are putting into _worker_dict.
92 """
93 with self._lock:
94 self._worker_dict[key] = value
95
96
97 def _worker_dict_get(self, key):
98 """
99 Private method used to synchronize access to _worker_dict.
100
101 @param key: key value we are using to access _worker_dict.
102 @return: value found when accessing _worker_dict
103 """
104 with self._lock:
105 return self._worker_dict.get(key)
106
107
108 def is_up(self):
109 """
110 Allows the frontend server to see if the dispatcher server is up before
111 attempting to queue requests.
112
113 @return: True. If connection fails, the client proxy will throw a socket
114 error on the client side.
115 """
116 return True
117
118
119 def queue_request(self, dut_hostname, new_state):
120 """
121 Looks up the appropriate RPMController instance for this DUT and queues
122 up the request.
123
124 @param dut_hostname: hostname of the DUT whose outlet we are trying to
125 change.
126 @param new_state: [ON, OFF, CYCLE] state we want to the change the
127 outlet to.
128 @return: True if the attempt to change power state was successful,
129 False otherwise.
130 """
131 logging.info('Received request to set DUT: %s to state: %s',
132 dut_hostname, new_state)
133 rpm_controller = self._get_rpm_controller(dut_hostname)
Fang Deng71c4b1f2013-05-20 09:55:04 -0700134 if not rpm_controller:
135 return False
Simran Basi7498d202012-07-10 15:21:28 -0700136 return rpm_controller.queue_request(dut_hostname, new_state)
137
138
139 def _get_rpm_controller(self, dut_hostname):
140 """
141 Private method that retreives the appropriate RPMController instance for
142 this DUT or calls _create_rpm_controller it if it does not already
143 exist.
144
145 @param dut_hostname: hostname of the DUT whose RPMController we want.
146
147 @return: RPMController instance responsible for this DUT's RPM.
Fang Deng71c4b1f2013-05-20 09:55:04 -0700148 Return None on failure.
Simran Basi7498d202012-07-10 15:21:28 -0700149 """
Fang Deng71c4b1f2013-05-20 09:55:04 -0700150 if dut_hostname.endswith('servo'):
151 # Servos are managed by Cisco POE switches.
152 reload_info = utils.reload_servo_interface_mapping_if_necessary(
153 self._mapping_last_modified)
154 if reload_info:
155 self._mapping_last_modified, self._servo_interface = reload_info
156 switch_if_tuple = self._servo_interface.get(dut_hostname)
157 if not switch_if_tuple:
158 logging.error('Could not determine POE hostname for %s. '
159 'Please check the servo-interface mapping file.',
160 dut_hostname)
161 return None
162 else:
163 rpm_hostname = switch_if_tuple[0]
164 logging.info('POE hostname for DUT %s is %s', dut_hostname,
165 rpm_hostname)
166 else:
167 # Regular DUTs are managed by RPMs.
168 rpm_hostname = re.sub('host[^.]*', 'rpm1', dut_hostname, count=1)
169 logging.info('RPM hostname for DUT %s is %s', dut_hostname,
170 rpm_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700171 rpm_controller = self._worker_dict_get(rpm_hostname)
172 if not rpm_controller:
173 rpm_controller = self._create_rpm_controller(rpm_hostname)
174 self._worker_dict_put(rpm_hostname, rpm_controller)
175 return rpm_controller
176
177
178 def _create_rpm_controller(self, rpm_hostname):
179 """
180 Determines the type of RPMController required and initializes it.
181
182 @param rpm_hostname: Hostname of the RPM we need to communicate with.
183
184 @return: RPMController instance responsible for this RPM.
185 """
186 hostname_elements = rpm_hostname.split('-')
Fang Deng71c4b1f2013-05-20 09:55:04 -0700187 if hostname_elements[-2] == 'poe':
188 # POE switch hostname looks like 'chromeos2-poe-switch1'.
189 logging.info('The controller is a Cisco POE switch.')
190 return rpm_controller.CiscoPOEController(
191 rpm_hostname, self._servo_interface)
Simran Basi7498d202012-07-10 15:21:28 -0700192 else:
Fang Deng71c4b1f2013-05-20 09:55:04 -0700193 # The device is an RPM.
194 rack_id = hostname_elements[-2]
195 rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
196 if rpm_typechecker.match(rack_id):
197 logging.info('RPM is a webpowered device.')
198 return rpm_controller.WebPoweredRPMController(rpm_hostname)
199 else:
200 logging.info('RPM is a Sentry CDU device.')
201 return rpm_controller.SentryRPMController(rpm_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700202
203
204 def _get_serveruri(self):
205 """
206 Formats the _address and _port into a meaningful URI string.
207
208 @return: URI of this dispatch server.
209 """
210 return 'http://%s:%d' % (self._address, self._port)
211
212
213 def _unregister(self):
214 """
215 Tells the frontend server that this dispatch server is shutting down and
216 to unregister it.
217
218 Called by atexit.
219
220 @raise RPMInfrastructureException: Raised if the dispatch server is
221 unable to unregister with the
222 frontend server.
223 """
224 logging.info('Dispatch server shutting down. Unregistering with RPM '
225 'frontend server.')
226 client = xmlrpclib.ServerProxy(self._frontend_server)
227 try:
228 client.unregister_dispatcher(self._get_serveruri())
229 except socket.error as er:
230 err_msg = ('Unable to unregister with frontend server. Error: %s.' %
231 errno.errorcode[er.errno])
232 logging.error(err_msg)
233 raise RPMInfrastructureException(err_msg)
234
235
236def launch_server_on_unused_port():
237 """
238 Looks up an unused port on this host and launches the xmlrpc server.
239
240 Useful for testing by running multiple dispatch servers on the same host.
241
242 @return: server,port - server object and the port that which it is listening
243 to.
244 """
245 address = socket.gethostbyname(socket.gethostname())
246 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
247 # Set this socket to allow reuse.
248 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
249 sock.bind(('', 0))
250 port = sock.getsockname()[1]
251 server = MultiThreadedXMLRPCServer((address, port),
252 allow_none=True)
253 sock.close()
254 return server, port
255
256
257if __name__ == '__main__':
258 """
259 Main function used to launch the dispatch server. Creates an instance of
260 RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
261 """
Scott Zawalski201d6be2012-09-21 15:56:25 -0400262 if len(sys.argv) > 1:
263 print 'Usage: ./%s, no arguments available.' % sys.argv[0]
264 sys.exit(1)
Simran Basi7498d202012-07-10 15:21:28 -0700265 rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT)
266 # Get the local ip _address and set the server to utilize it.
267 address = socket.gethostbyname(socket.gethostname())
268 server, port = launch_server_on_unused_port()
269 rpm_dispatcher = RPMDispatcher(address, port)
270 server.register_instance(rpm_dispatcher)
Scott Zawalski201d6be2012-09-21 15:56:25 -0400271 server.serve_forever()