blob: 67396bd55fb053442ae93c5b5eafa41690a8799b [file] [log] [blame]
Simran Basi7498d202012-07-10 15:21:28 -07001# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import atexit
6import errno
7import logging
8import re
9import socket
10import threading
11import time
12import xmlrpclib
13
14from config import rpm_config
15from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
16import rpm_controller
17from rpm_infrastructure_exception import RPMInfrastructureException
18import rpm_logging_config
19
20LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
21
22
23class RPMDispatcher(object):
24 """
25 This class is the RPM dispatcher server and it is responsible for
26 communicating directly to the RPM devices to change a DUT's outlet status.
27
28 When an RPMDispatcher is initialized it registers itself with the frontend
29 server, who will field out outlet requests to this dispatcher.
30
31 Once a request is received the dispatcher looks up the RPMController
32 instance for the given DUT and then queues up the request and blocks until
33 it is processed.
34
35 @var _address: IP address or Hostname of this dispatcher server.
36 @var _frontend_server: URI of the frontend server.
37 @var _lock: Lock used to synchronize access to _worker_dict.
38 @var _port: Port assigned to this server instance.
39 @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
40 instances.
41 """
42
43
44 def __init__(self, address, port):
45 """
46 RPMDispatcher constructor.
47
48 Initialized instance vars and registers this server with the frontend
49 server.
50
51 @param address: Address of this dispatcher server.
52 @param port: Port assigned to this dispatcher server.
53
54 @raise RPMInfrastructureException: Raised if the dispatch server is
55 unable to register with the frontend
56 server.
57 """
58 self._address = address
59 self._port = port
60 self._lock = threading.Lock()
61 self._worker_dict = {}
62 self._frontend_server = rpm_config.get('RPM_INFRASTRUCTURE',
63 'frontend_uri')
64 logging.info('Registering this rpm dispatcher with the frontend '
65 'server at %s.', self._frontend_server)
66 client = xmlrpclib.ServerProxy(self._frontend_server)
67 # De-register with the frontend when the dispatcher exit's.
68 atexit.register(self._unregister)
69 try:
70 client.register_dispatcher(self._get_serveruri())
71 except socket.error as er:
72 err_msg = ('Unable to register with frontend server. Error: %s.' %
73 errno.errorcode[er.errno])
74 logging.error(err_msg)
75 raise RPMInfrastructureException(err_msg)
76
77
78 def _worker_dict_put(self, key, value):
79 """
80 Private method used to synchronize access to _worker_dict.
81
82 @param key: key value we are using to access _worker_dict.
83 @param value: value we are putting into _worker_dict.
84 """
85 with self._lock:
86 self._worker_dict[key] = value
87
88
89 def _worker_dict_get(self, key):
90 """
91 Private method used to synchronize access to _worker_dict.
92
93 @param key: key value we are using to access _worker_dict.
94 @return: value found when accessing _worker_dict
95 """
96 with self._lock:
97 return self._worker_dict.get(key)
98
99
100 def is_up(self):
101 """
102 Allows the frontend server to see if the dispatcher server is up before
103 attempting to queue requests.
104
105 @return: True. If connection fails, the client proxy will throw a socket
106 error on the client side.
107 """
108 return True
109
110
111 def queue_request(self, dut_hostname, new_state):
112 """
113 Looks up the appropriate RPMController instance for this DUT and queues
114 up the request.
115
116 @param dut_hostname: hostname of the DUT whose outlet we are trying to
117 change.
118 @param new_state: [ON, OFF, CYCLE] state we want to the change the
119 outlet to.
120 @return: True if the attempt to change power state was successful,
121 False otherwise.
122 """
123 logging.info('Received request to set DUT: %s to state: %s',
124 dut_hostname, new_state)
125 rpm_controller = self._get_rpm_controller(dut_hostname)
126 return rpm_controller.queue_request(dut_hostname, new_state)
127
128
129 def _get_rpm_controller(self, dut_hostname):
130 """
131 Private method that retreives the appropriate RPMController instance for
132 this DUT or calls _create_rpm_controller it if it does not already
133 exist.
134
135 @param dut_hostname: hostname of the DUT whose RPMController we want.
136
137 @return: RPMController instance responsible for this DUT's RPM.
138 """
139 rpm_hostname = re.sub('host[^.]*', 'rpm1', dut_hostname, count=1)
140 logging.info('RPM hostname for DUT %s is %s', dut_hostname,
141 rpm_hostname)
142 rpm_controller = self._worker_dict_get(rpm_hostname)
143 if not rpm_controller:
144 rpm_controller = self._create_rpm_controller(rpm_hostname)
145 self._worker_dict_put(rpm_hostname, rpm_controller)
146 return rpm_controller
147
148
149 def _create_rpm_controller(self, rpm_hostname):
150 """
151 Determines the type of RPMController required and initializes it.
152
153 @param rpm_hostname: Hostname of the RPM we need to communicate with.
154
155 @return: RPMController instance responsible for this RPM.
156 """
157 hostname_elements = rpm_hostname.split('-')
158 rack_id = hostname_elements[-2]
159 rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
160 if rpm_typechecker.match(rack_id):
161 logging.info('RPM is a webpowered device')
162 return rpm_controller.WebPoweredRPMController(rpm_hostname)
163 else:
164 logging.info('RPM is a Sentry CDU device')
165 return rpm_controller.SentryRPMController(rpm_hostname)
166
167
168 def _get_serveruri(self):
169 """
170 Formats the _address and _port into a meaningful URI string.
171
172 @return: URI of this dispatch server.
173 """
174 return 'http://%s:%d' % (self._address, self._port)
175
176
177 def _unregister(self):
178 """
179 Tells the frontend server that this dispatch server is shutting down and
180 to unregister it.
181
182 Called by atexit.
183
184 @raise RPMInfrastructureException: Raised if the dispatch server is
185 unable to unregister with the
186 frontend server.
187 """
188 logging.info('Dispatch server shutting down. Unregistering with RPM '
189 'frontend server.')
190 client = xmlrpclib.ServerProxy(self._frontend_server)
191 try:
192 client.unregister_dispatcher(self._get_serveruri())
193 except socket.error as er:
194 err_msg = ('Unable to unregister with frontend server. Error: %s.' %
195 errno.errorcode[er.errno])
196 logging.error(err_msg)
197 raise RPMInfrastructureException(err_msg)
198
199
200def launch_server_on_unused_port():
201 """
202 Looks up an unused port on this host and launches the xmlrpc server.
203
204 Useful for testing by running multiple dispatch servers on the same host.
205
206 @return: server,port - server object and the port that which it is listening
207 to.
208 """
209 address = socket.gethostbyname(socket.gethostname())
210 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
211 # Set this socket to allow reuse.
212 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
213 sock.bind(('', 0))
214 port = sock.getsockname()[1]
215 server = MultiThreadedXMLRPCServer((address, port),
216 allow_none=True)
217 sock.close()
218 return server, port
219
220
221if __name__ == '__main__':
222 """
223 Main function used to launch the dispatch server. Creates an instance of
224 RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
225 """
226 rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT)
227 # Get the local ip _address and set the server to utilize it.
228 address = socket.gethostbyname(socket.gethostname())
229 server, port = launch_server_on_unused_port()
230 rpm_dispatcher = RPMDispatcher(address, port)
231 server.register_instance(rpm_dispatcher)
232 server.serve_forever()