blob: 1745f37d3f64d0c1403a2ded4124d2c821e09950 [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Simran Basi7498d202012-07-10 15:21:28 -070010import re
Scott Zawalski201d6be2012-09-21 15:56:25 -040011import sys
Simran Basi7498d202012-07-10 15:21:28 -070012import socket
13import threading
14import xmlrpclib
15
Fang Deng71c4b1f2013-05-20 09:55:04 -070016import rpm_logging_config
17import utils
Simran Basi7498d202012-07-10 15:21:28 -070018from config import rpm_config
19from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
20from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070021
22DEFAULT_RPM_COUNT = 0
23TERMINATED = -1
24
25# Indexes for accessing heap entries.
26RPM_COUNT = 0
27DISPATCHER_URI = 1
28
29LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
30DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
31
32# Valid state values.
33VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
34
35# RPM Hostname regex.
36RPM_REGEX = re.compile('host[^.]*')
37
Fang Deng71c4b1f2013-05-20 09:55:04 -070038# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070039MAPPING_FILE = os.path.join(
40 os.path.dirname(__file__),
41 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070042
Simran Basi7498d202012-07-10 15:21:28 -070043
44class RPMFrontendServer(object):
45 """
46 This class is the frontend server of the RPM Infrastructure. All clients
47 will send their DUT power state requests to this central server who will
48 forward the requests to an avaliable or already assigned RPM dispatcher
49 server.
50
51 Once the dispatcher processes the request it will return the result
52 to this frontend server who will send the result back to the client.
53
54 All calls to this server are blocking.
55
56 @var _dispatcher_minheap: Min heap that returns a list of format-
57 [ num_rpm's, dispatcher_uri ]
58 Used to choose the least loaded dispatcher.
Fang Deng71c4b1f2013-05-20 09:55:04 -070059 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
Simran Basi7498d202012-07-10 15:21:28 -070060 heap. If a dispatcher server shuts down this allows us to
61 invalidate the entry in the minheap.
62 @var _lock: Used to protect data from multiple running threads all
63 manipulating the same data.
64 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
65 server.
Fang Deng71c4b1f2013-05-20 09:55:04 -070066 @var _mapping_last_modified: Last-modified time of the servo-interface
67 mapping file.
68 @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
Simran Basi4e3d1182013-06-25 16:12:30 -070069 @var _email_handler: Email handler to use to control email notifications.
Simran Basi7498d202012-07-10 15:21:28 -070070 """
71
72
Simran Basi4e3d1182013-06-25 16:12:30 -070073 def __init__(self, email_handler=None):
Simran Basi7498d202012-07-10 15:21:28 -070074 """
75 RPMFrontendServer constructor.
76
77 Initializes instance variables.
78 """
79 self._dispatcher_minheap = []
80 self._entry_dict = {}
81 self._lock = threading.Lock()
82 self._rpm_dict = {}
Fang Deng71c4b1f2013-05-20 09:55:04 -070083 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
84 self._servo_interface = utils.load_servo_interface_mapping()
Simran Basi4e3d1182013-06-25 16:12:30 -070085 self._email_handler = email_handler
Simran Basi7498d202012-07-10 15:21:28 -070086
87
88 def queue_request(self, dut_hostname, new_state):
89 """
90 Forwards a request to change a DUT's power state to the appropriate
91 dispatcher server.
92
93 This call will block until the forwarded request returns.
94
95 @param dut_hostname: Hostname of the DUT whose power state we want to
96 change.
97 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
98 DUT's outlet to.
99
100 @return: True if the attempt to change power state was successful,
101 False otherwise.
102
103 @raise RPMInfrastructureException: No dispatchers are available or can
104 be reached.
105 """
Simran Basid1222642012-09-24 15:31:58 -0700106 # Remove any DNS Zone information and simplify down to just the dut
107 # hostname.
108 dut_hostname = dut_hostname.split('.')[0]
Simran Basi7498d202012-07-10 15:21:28 -0700109 new_state = new_state.upper()
Simran Basid1222642012-09-24 15:31:58 -0700110 # Put new_state in all uppercase letters
Simran Basi7498d202012-07-10 15:21:28 -0700111 if new_state not in VALID_STATE_VALUES:
112 logging.error('Received request to set DUT: %s to invalid state %s',
113 dut_hostname, new_state)
114 return False
115 logging.info('Received request to set DUT: %s to state: %s',
116 dut_hostname, new_state)
117 dispatcher_uri = self._get_dispatcher(dut_hostname)
118 if not dispatcher_uri:
119 # No dispatchers available.
120 raise RPMInfrastructureException('No dispatchers available.')
121 client = xmlrpclib.ServerProxy(dispatcher_uri)
122 try:
123 # Block on the request and return the result once it arrives.
124 return client.queue_request(dut_hostname, new_state)
125 except socket.error as er:
126 # Dispatcher Server is not reachable. Unregister it and retry.
127 logging.error("Can't reach Dispatch Server: %s. Error: %s",
128 dispatcher_uri, errno.errorcode[er.errno])
129 if self.is_network_infrastructure_down():
130 # No dispatchers can handle this request so raise an Exception
131 # to the caller.
132 raise RPMInfrastructureException('No dispatchers can be'
133 'reached.')
134 logging.info('Will attempt forwarding request to other dispatch '
135 'servers.')
136 logging.error('Unregistering %s due to error. Recommend resetting '
137 'that dispatch server.', dispatcher_uri)
138 self.unregister_dispatcher(dispatcher_uri)
139 # Retry forwarding the request.
140 return self.queue_request(dut_hostname, new_state)
141
142
143 def is_network_infrastructure_down(self):
144 """
145 Check to see if we can communicate with any dispatcher servers.
146
147 Only called in the situation that queuing a request to a dispatcher
148 server failed.
149
150 @return: False if any dispatcher server is up and the rpm infrastructure
151 can still function. True otherwise.
152 """
153 for dispatcher_entry in self._dispatcher_minheap:
154 dispatcher = xmlrpclib.ServerProxy(dispatcher_entry[DISPATCHER_URI])
155 try:
156 if dispatcher.is_up():
157 # Atleast one dispatcher is alive so our network is fine.
158 return False
159 except socket.error:
160 # Can't talk to this dispatcher so keep looping.
161 pass
162 logging.error("Can't reach any dispatchers. Check frontend network "
163 'status or all dispatchers are down.')
164 return True
165
166
167 def _get_dispatcher(self, dut_hostname):
168 """
169 Private method that looks up or assigns a dispatcher server
170 responsible for communicating with the given DUT's RPM.
171
172 Will also call _check_dispatcher to make sure it is up before returning
173 it.
174
175 @param dut_hostname: Hostname of the DUT whose dispatcher URI we want
176 to retrieve.
177
178 @return: URI of dispatcher server responsible for this DUT's rpm. None
179 if no dispatcher servers are available.
180 """
Fang Deng71c4b1f2013-05-20 09:55:04 -0700181 if dut_hostname.endswith('servo'):
182 # Servos are managed by Cisco POE switches.
183 reload_info = utils.reload_servo_interface_mapping_if_necessary(
184 self._mapping_last_modified)
185 if reload_info:
186 self._mapping_last_modified, self._servo_interface = reload_info
187 switch_if_tuple = self._servo_interface.get(dut_hostname)
188 if not switch_if_tuple:
189 logging.error('Could not determine POE hostname for %s. '
190 'Please check the servo-interface mapping file.',
191 dut_hostname)
192 return None
193 else:
194 rpm_hostname = switch_if_tuple[0]
195 else:
196 # Regular DUTs are managed by RPMs.
197 rpm_hostname = RPM_REGEX.sub(DEFAULT_RPM_ID, dut_hostname, count=1)
Simran Basi7498d202012-07-10 15:21:28 -0700198 with self._lock:
199 if self._rpm_dict.get(rpm_hostname):
200 return self._rpm_dict[rpm_hostname]
201 logging.info('No Dispatcher assigned for RPM %s.', rpm_hostname)
202 # Choose the least loaded dispatcher to communicate with the RPM.
203 try:
204 heap_entry = heapq.heappop(self._dispatcher_minheap)
205 except IndexError:
206 logging.error('Infrastructure Error: Frontend has no'
207 'registered dispatchers to field out this '
208 'request!')
209 return None
210 dispatcher_uri = heap_entry[DISPATCHER_URI]
211 # Put this entry back in the heap with an RPM Count + 1.
212 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
213 heapq.heappush(self._dispatcher_minheap, heap_entry)
214 logging.info('Assigning %s for RPM %s', dispatcher_uri,
215 rpm_hostname)
216 self._rpm_dict[rpm_hostname] = dispatcher_uri
217 return dispatcher_uri
218
219
220 def register_dispatcher(self, dispatcher_uri):
221 """
222 Called by a dispatcher server so that the frontend server knows it is
223 available to field out RPM requests.
224
225 Adds an entry to the min heap and entry map for this dispatcher.
226
227 @param dispatcher_uri: Address of dispatcher server we are registering.
228 """
229 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
230 with self._lock:
231 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
232 heapq.heappush(self._dispatcher_minheap, heap_entry)
233 self._entry_dict[dispatcher_uri] = heap_entry
234
235
236 def unregister_dispatcher(self, uri_to_unregister):
237 """
238 Called by a dispatcher server as it exits so that the frontend server
239 knows that it is no longer available to field out requests.
240
241 Assigns an rpm count of -1 to this dispatcher so that it will be pushed
242 out of the min heap.
243
244 Removes from _rpm_dict all entries with the value of this dispatcher so
245 that those RPM's can be reassigned to a new dispatcher.
246
247 @param uri_to_unregister: Address of dispatcher server we are
248 unregistering.
249 """
250 logging.info('Unregistering uri: %s as a rpm dispatcher.',
251 uri_to_unregister)
252 with self._lock:
253 heap_entry = self._entry_dict.get(uri_to_unregister)
254 if not heap_entry:
255 logging.warning('%s was not registered.', uri_to_unregister)
256 return
257 # Set this entry's RPM_COUNT to TERMINATED (-1).
258 heap_entry[RPM_COUNT] = TERMINATED
259 # Remove all RPM mappings.
260 for rpm, dispatcher in self._rpm_dict.items():
261 if dispatcher == uri_to_unregister:
262 self._rpm_dict[rpm] = None
263 self._entry_dict[uri_to_unregister] = None
264 # Re-sort the heap and remove any terminated dispatchers.
265 heapq.heapify(self._dispatcher_minheap)
266 self._remove_terminated_dispatchers()
267
268
269 def _remove_terminated_dispatchers(self):
270 """
271 Peek at the head of the heap and keep popping off values until there is
272 a non-terminated dispatcher at the top.
273 """
274 # Heapq guarantees the head of the heap is in the '0' index.
275 try:
276 # Peek at the next element in the heap.
277 top_of_heap = self._dispatcher_minheap[0]
278 while top_of_heap[RPM_COUNT] is TERMINATED:
279 # Pop off the top element.
280 heapq.heappop(self._dispatcher_minheap)
281 # Peek at the next element in the heap.
282 top_of_heap = self._dispatcher_minheap[0]
283 except IndexError:
284 # No more values in the heap. Can be thrown by both minheap[0]
285 # statements.
286 pass
287
288
Simran Basi4e3d1182013-06-25 16:12:30 -0700289 def suspend_emails(self, hours):
290 """Suspend email notifications.
291
292 @param hours: How many hours to suspend email notifications.
293 """
294 if self._email_handler:
295 self._email_handler.suspend_emails(hours)
296
297
298 def resume_emails(self):
299 """Resume email notifications."""
300 if self._email_handler:
301 self._email_handler.resume_emails()
302
303
Simran Basi7498d202012-07-10 15:21:28 -0700304if __name__ == '__main__':
305 """
306 Main function used to launch the frontend server. Creates an instance of
307 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
308 """
Scott Zawalski201d6be2012-09-21 15:56:25 -0400309 if len(sys.argv) > 1:
310 print 'Usage: ./%s, no arguments available.' % sys.argv[0]
311 sys.exit(1)
Simran Basi4e3d1182013-06-25 16:12:30 -0700312 email_handler = rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT)
313 frontend_server = RPMFrontendServer(email_handler=email_handler)
Simran Basi7498d202012-07-10 15:21:28 -0700314 address = rpm_config.get('RPM_INFRASTRUCTURE', 'frontend_addr')
315 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
316 server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
317 server.register_instance(frontend_server)
318 logging.info('Listening on %s port %d', address, port)
319 server.serve_forever()