blob: e7af41b4bdf1d5fee9d0f50c604eb95e9ab59ffa [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Simran Basi7498d202012-07-10 15:21:28 -070010import re
Scott Zawalski201d6be2012-09-21 15:56:25 -040011import sys
Simran Basi7498d202012-07-10 15:21:28 -070012import socket
13import threading
14import xmlrpclib
15
Fang Deng71c4b1f2013-05-20 09:55:04 -070016import rpm_logging_config
17import utils
Simran Basi7498d202012-07-10 15:21:28 -070018from config import rpm_config
19from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
20from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070021
22DEFAULT_RPM_COUNT = 0
23TERMINATED = -1
24
25# Indexes for accessing heap entries.
26RPM_COUNT = 0
27DISPATCHER_URI = 1
28
29LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
30DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
31
32# Valid state values.
33VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
34
35# RPM Hostname regex.
36RPM_REGEX = re.compile('host[^.]*')
37
Fang Deng71c4b1f2013-05-20 09:55:04 -070038# Servo-interface mapping file
39MAPPING_FILE = rpm_config.get('CiscoPOE', 'servo_interface_mapping_file')
40
Simran Basi7498d202012-07-10 15:21:28 -070041
42class RPMFrontendServer(object):
43 """
44 This class is the frontend server of the RPM Infrastructure. All clients
45 will send their DUT power state requests to this central server who will
46 forward the requests to an avaliable or already assigned RPM dispatcher
47 server.
48
49 Once the dispatcher processes the request it will return the result
50 to this frontend server who will send the result back to the client.
51
52 All calls to this server are blocking.
53
54 @var _dispatcher_minheap: Min heap that returns a list of format-
55 [ num_rpm's, dispatcher_uri ]
56 Used to choose the least loaded dispatcher.
Fang Deng71c4b1f2013-05-20 09:55:04 -070057 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
Simran Basi7498d202012-07-10 15:21:28 -070058 heap. If a dispatcher server shuts down this allows us to
59 invalidate the entry in the minheap.
60 @var _lock: Used to protect data from multiple running threads all
61 manipulating the same data.
62 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
63 server.
Fang Deng71c4b1f2013-05-20 09:55:04 -070064 @var _mapping_last_modified: Last-modified time of the servo-interface
65 mapping file.
66 @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
Simran Basi7498d202012-07-10 15:21:28 -070067 """
68
69
70 def __init__(self):
71 """
72 RPMFrontendServer constructor.
73
74 Initializes instance variables.
75 """
76 self._dispatcher_minheap = []
77 self._entry_dict = {}
78 self._lock = threading.Lock()
79 self._rpm_dict = {}
Fang Deng71c4b1f2013-05-20 09:55:04 -070080 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
81 self._servo_interface = utils.load_servo_interface_mapping()
Simran Basi7498d202012-07-10 15:21:28 -070082
83
84 def queue_request(self, dut_hostname, new_state):
85 """
86 Forwards a request to change a DUT's power state to the appropriate
87 dispatcher server.
88
89 This call will block until the forwarded request returns.
90
91 @param dut_hostname: Hostname of the DUT whose power state we want to
92 change.
93 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
94 DUT's outlet to.
95
96 @return: True if the attempt to change power state was successful,
97 False otherwise.
98
99 @raise RPMInfrastructureException: No dispatchers are available or can
100 be reached.
101 """
Simran Basid1222642012-09-24 15:31:58 -0700102 # Remove any DNS Zone information and simplify down to just the dut
103 # hostname.
104 dut_hostname = dut_hostname.split('.')[0]
Simran Basi7498d202012-07-10 15:21:28 -0700105 new_state = new_state.upper()
Simran Basid1222642012-09-24 15:31:58 -0700106 # Put new_state in all uppercase letters
Simran Basi7498d202012-07-10 15:21:28 -0700107 if new_state not in VALID_STATE_VALUES:
108 logging.error('Received request to set DUT: %s to invalid state %s',
109 dut_hostname, new_state)
110 return False
111 logging.info('Received request to set DUT: %s to state: %s',
112 dut_hostname, new_state)
113 dispatcher_uri = self._get_dispatcher(dut_hostname)
114 if not dispatcher_uri:
115 # No dispatchers available.
116 raise RPMInfrastructureException('No dispatchers available.')
117 client = xmlrpclib.ServerProxy(dispatcher_uri)
118 try:
119 # Block on the request and return the result once it arrives.
120 return client.queue_request(dut_hostname, new_state)
121 except socket.error as er:
122 # Dispatcher Server is not reachable. Unregister it and retry.
123 logging.error("Can't reach Dispatch Server: %s. Error: %s",
124 dispatcher_uri, errno.errorcode[er.errno])
125 if self.is_network_infrastructure_down():
126 # No dispatchers can handle this request so raise an Exception
127 # to the caller.
128 raise RPMInfrastructureException('No dispatchers can be'
129 'reached.')
130 logging.info('Will attempt forwarding request to other dispatch '
131 'servers.')
132 logging.error('Unregistering %s due to error. Recommend resetting '
133 'that dispatch server.', dispatcher_uri)
134 self.unregister_dispatcher(dispatcher_uri)
135 # Retry forwarding the request.
136 return self.queue_request(dut_hostname, new_state)
137
138
139 def is_network_infrastructure_down(self):
140 """
141 Check to see if we can communicate with any dispatcher servers.
142
143 Only called in the situation that queuing a request to a dispatcher
144 server failed.
145
146 @return: False if any dispatcher server is up and the rpm infrastructure
147 can still function. True otherwise.
148 """
149 for dispatcher_entry in self._dispatcher_minheap:
150 dispatcher = xmlrpclib.ServerProxy(dispatcher_entry[DISPATCHER_URI])
151 try:
152 if dispatcher.is_up():
153 # Atleast one dispatcher is alive so our network is fine.
154 return False
155 except socket.error:
156 # Can't talk to this dispatcher so keep looping.
157 pass
158 logging.error("Can't reach any dispatchers. Check frontend network "
159 'status or all dispatchers are down.')
160 return True
161
162
163 def _get_dispatcher(self, dut_hostname):
164 """
165 Private method that looks up or assigns a dispatcher server
166 responsible for communicating with the given DUT's RPM.
167
168 Will also call _check_dispatcher to make sure it is up before returning
169 it.
170
171 @param dut_hostname: Hostname of the DUT whose dispatcher URI we want
172 to retrieve.
173
174 @return: URI of dispatcher server responsible for this DUT's rpm. None
175 if no dispatcher servers are available.
176 """
Fang Deng71c4b1f2013-05-20 09:55:04 -0700177 if dut_hostname.endswith('servo'):
178 # Servos are managed by Cisco POE switches.
179 reload_info = utils.reload_servo_interface_mapping_if_necessary(
180 self._mapping_last_modified)
181 if reload_info:
182 self._mapping_last_modified, self._servo_interface = reload_info
183 switch_if_tuple = self._servo_interface.get(dut_hostname)
184 if not switch_if_tuple:
185 logging.error('Could not determine POE hostname for %s. '
186 'Please check the servo-interface mapping file.',
187 dut_hostname)
188 return None
189 else:
190 rpm_hostname = switch_if_tuple[0]
191 else:
192 # Regular DUTs are managed by RPMs.
193 rpm_hostname = RPM_REGEX.sub(DEFAULT_RPM_ID, dut_hostname, count=1)
Simran Basi7498d202012-07-10 15:21:28 -0700194 with self._lock:
195 if self._rpm_dict.get(rpm_hostname):
196 return self._rpm_dict[rpm_hostname]
197 logging.info('No Dispatcher assigned for RPM %s.', rpm_hostname)
198 # Choose the least loaded dispatcher to communicate with the RPM.
199 try:
200 heap_entry = heapq.heappop(self._dispatcher_minheap)
201 except IndexError:
202 logging.error('Infrastructure Error: Frontend has no'
203 'registered dispatchers to field out this '
204 'request!')
205 return None
206 dispatcher_uri = heap_entry[DISPATCHER_URI]
207 # Put this entry back in the heap with an RPM Count + 1.
208 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
209 heapq.heappush(self._dispatcher_minheap, heap_entry)
210 logging.info('Assigning %s for RPM %s', dispatcher_uri,
211 rpm_hostname)
212 self._rpm_dict[rpm_hostname] = dispatcher_uri
213 return dispatcher_uri
214
215
216 def register_dispatcher(self, dispatcher_uri):
217 """
218 Called by a dispatcher server so that the frontend server knows it is
219 available to field out RPM requests.
220
221 Adds an entry to the min heap and entry map for this dispatcher.
222
223 @param dispatcher_uri: Address of dispatcher server we are registering.
224 """
225 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
226 with self._lock:
227 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
228 heapq.heappush(self._dispatcher_minheap, heap_entry)
229 self._entry_dict[dispatcher_uri] = heap_entry
230
231
232 def unregister_dispatcher(self, uri_to_unregister):
233 """
234 Called by a dispatcher server as it exits so that the frontend server
235 knows that it is no longer available to field out requests.
236
237 Assigns an rpm count of -1 to this dispatcher so that it will be pushed
238 out of the min heap.
239
240 Removes from _rpm_dict all entries with the value of this dispatcher so
241 that those RPM's can be reassigned to a new dispatcher.
242
243 @param uri_to_unregister: Address of dispatcher server we are
244 unregistering.
245 """
246 logging.info('Unregistering uri: %s as a rpm dispatcher.',
247 uri_to_unregister)
248 with self._lock:
249 heap_entry = self._entry_dict.get(uri_to_unregister)
250 if not heap_entry:
251 logging.warning('%s was not registered.', uri_to_unregister)
252 return
253 # Set this entry's RPM_COUNT to TERMINATED (-1).
254 heap_entry[RPM_COUNT] = TERMINATED
255 # Remove all RPM mappings.
256 for rpm, dispatcher in self._rpm_dict.items():
257 if dispatcher == uri_to_unregister:
258 self._rpm_dict[rpm] = None
259 self._entry_dict[uri_to_unregister] = None
260 # Re-sort the heap and remove any terminated dispatchers.
261 heapq.heapify(self._dispatcher_minheap)
262 self._remove_terminated_dispatchers()
263
264
265 def _remove_terminated_dispatchers(self):
266 """
267 Peek at the head of the heap and keep popping off values until there is
268 a non-terminated dispatcher at the top.
269 """
270 # Heapq guarantees the head of the heap is in the '0' index.
271 try:
272 # Peek at the next element in the heap.
273 top_of_heap = self._dispatcher_minheap[0]
274 while top_of_heap[RPM_COUNT] is TERMINATED:
275 # Pop off the top element.
276 heapq.heappop(self._dispatcher_minheap)
277 # Peek at the next element in the heap.
278 top_of_heap = self._dispatcher_minheap[0]
279 except IndexError:
280 # No more values in the heap. Can be thrown by both minheap[0]
281 # statements.
282 pass
283
284
285if __name__ == '__main__':
286 """
287 Main function used to launch the frontend server. Creates an instance of
288 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
289 """
Scott Zawalski201d6be2012-09-21 15:56:25 -0400290 if len(sys.argv) > 1:
291 print 'Usage: ./%s, no arguments available.' % sys.argv[0]
292 sys.exit(1)
Simran Basi7498d202012-07-10 15:21:28 -0700293 rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT)
294 frontend_server = RPMFrontendServer()
295 address = rpm_config.get('RPM_INFRASTRUCTURE', 'frontend_addr')
296 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
297 server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
298 server.register_instance(frontend_server)
299 logging.info('Listening on %s port %d', address, port)
300 server.serve_forever()