Scott Zawalski | 201d6be | 2012-09-21 15:56:25 -0400 | [diff] [blame] | 1 | #!/usr/bin/python |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 2 | # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| 3 | # Use of this source code is governed by a BSD-style license that can be |
| 4 | # found in the LICENSE file. |
| 5 | |
| 6 | import errno |
| 7 | import heapq |
| 8 | import logging |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 9 | import os |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 10 | import re |
Scott Zawalski | 201d6be | 2012-09-21 15:56:25 -0400 | [diff] [blame] | 11 | import sys |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 12 | import socket |
| 13 | import threading |
| 14 | import xmlrpclib |
| 15 | |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 16 | import rpm_logging_config |
| 17 | import utils |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 18 | from config import rpm_config |
| 19 | from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer |
| 20 | from rpm_infrastructure_exception import RPMInfrastructureException |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 21 | |
| 22 | DEFAULT_RPM_COUNT = 0 |
| 23 | TERMINATED = -1 |
| 24 | |
| 25 | # Indexes for accessing heap entries. |
| 26 | RPM_COUNT = 0 |
| 27 | DISPATCHER_URI = 1 |
| 28 | |
| 29 | LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format') |
| 30 | DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id') |
| 31 | |
| 32 | # Valid state values. |
| 33 | VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE'] |
| 34 | |
| 35 | # RPM Hostname regex. |
| 36 | RPM_REGEX = re.compile('host[^.]*') |
| 37 | |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 38 | # Servo-interface mapping file |
Fang Deng | 5e4e46d | 2013-06-19 13:57:08 -0700 | [diff] [blame] | 39 | MAPPING_FILE = os.path.join( |
| 40 | os.path.dirname(__file__), |
| 41 | rpm_config.get('CiscoPOE', 'servo_interface_mapping_file')) |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 42 | |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 43 | |
| 44 | class RPMFrontendServer(object): |
| 45 | """ |
| 46 | This class is the frontend server of the RPM Infrastructure. All clients |
| 47 | will send their DUT power state requests to this central server who will |
| 48 | forward the requests to an avaliable or already assigned RPM dispatcher |
| 49 | server. |
| 50 | |
| 51 | Once the dispatcher processes the request it will return the result |
| 52 | to this frontend server who will send the result back to the client. |
| 53 | |
| 54 | All calls to this server are blocking. |
| 55 | |
| 56 | @var _dispatcher_minheap: Min heap that returns a list of format- |
| 57 | [ num_rpm's, dispatcher_uri ] |
| 58 | Used to choose the least loaded dispatcher. |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 59 | @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 60 | heap. If a dispatcher server shuts down this allows us to |
| 61 | invalidate the entry in the minheap. |
| 62 | @var _lock: Used to protect data from multiple running threads all |
| 63 | manipulating the same data. |
| 64 | @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher |
| 65 | server. |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 66 | @var _mapping_last_modified: Last-modified time of the servo-interface |
| 67 | mapping file. |
| 68 | @var _servo_interface: Maps servo hostname to (switch_hostname, interface). |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame^] | 69 | @var _email_handler: Email handler to use to control email notifications. |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 70 | """ |
| 71 | |
| 72 | |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame^] | 73 | def __init__(self, email_handler=None): |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 74 | """ |
| 75 | RPMFrontendServer constructor. |
| 76 | |
| 77 | Initializes instance variables. |
| 78 | """ |
| 79 | self._dispatcher_minheap = [] |
| 80 | self._entry_dict = {} |
| 81 | self._lock = threading.Lock() |
| 82 | self._rpm_dict = {} |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 83 | self._mapping_last_modified = os.path.getmtime(MAPPING_FILE) |
| 84 | self._servo_interface = utils.load_servo_interface_mapping() |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame^] | 85 | self._email_handler = email_handler |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 86 | |
| 87 | |
| 88 | def queue_request(self, dut_hostname, new_state): |
| 89 | """ |
| 90 | Forwards a request to change a DUT's power state to the appropriate |
| 91 | dispatcher server. |
| 92 | |
| 93 | This call will block until the forwarded request returns. |
| 94 | |
| 95 | @param dut_hostname: Hostname of the DUT whose power state we want to |
| 96 | change. |
| 97 | @param new_state: [ON, OFF, CYCLE] State to which we want to set the |
| 98 | DUT's outlet to. |
| 99 | |
| 100 | @return: True if the attempt to change power state was successful, |
| 101 | False otherwise. |
| 102 | |
| 103 | @raise RPMInfrastructureException: No dispatchers are available or can |
| 104 | be reached. |
| 105 | """ |
Simran Basi | d122264 | 2012-09-24 15:31:58 -0700 | [diff] [blame] | 106 | # Remove any DNS Zone information and simplify down to just the dut |
| 107 | # hostname. |
| 108 | dut_hostname = dut_hostname.split('.')[0] |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 109 | new_state = new_state.upper() |
Simran Basi | d122264 | 2012-09-24 15:31:58 -0700 | [diff] [blame] | 110 | # Put new_state in all uppercase letters |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 111 | if new_state not in VALID_STATE_VALUES: |
| 112 | logging.error('Received request to set DUT: %s to invalid state %s', |
| 113 | dut_hostname, new_state) |
| 114 | return False |
| 115 | logging.info('Received request to set DUT: %s to state: %s', |
| 116 | dut_hostname, new_state) |
| 117 | dispatcher_uri = self._get_dispatcher(dut_hostname) |
| 118 | if not dispatcher_uri: |
| 119 | # No dispatchers available. |
| 120 | raise RPMInfrastructureException('No dispatchers available.') |
| 121 | client = xmlrpclib.ServerProxy(dispatcher_uri) |
| 122 | try: |
| 123 | # Block on the request and return the result once it arrives. |
| 124 | return client.queue_request(dut_hostname, new_state) |
| 125 | except socket.error as er: |
| 126 | # Dispatcher Server is not reachable. Unregister it and retry. |
| 127 | logging.error("Can't reach Dispatch Server: %s. Error: %s", |
| 128 | dispatcher_uri, errno.errorcode[er.errno]) |
| 129 | if self.is_network_infrastructure_down(): |
| 130 | # No dispatchers can handle this request so raise an Exception |
| 131 | # to the caller. |
| 132 | raise RPMInfrastructureException('No dispatchers can be' |
| 133 | 'reached.') |
| 134 | logging.info('Will attempt forwarding request to other dispatch ' |
| 135 | 'servers.') |
| 136 | logging.error('Unregistering %s due to error. Recommend resetting ' |
| 137 | 'that dispatch server.', dispatcher_uri) |
| 138 | self.unregister_dispatcher(dispatcher_uri) |
| 139 | # Retry forwarding the request. |
| 140 | return self.queue_request(dut_hostname, new_state) |
| 141 | |
| 142 | |
| 143 | def is_network_infrastructure_down(self): |
| 144 | """ |
| 145 | Check to see if we can communicate with any dispatcher servers. |
| 146 | |
| 147 | Only called in the situation that queuing a request to a dispatcher |
| 148 | server failed. |
| 149 | |
| 150 | @return: False if any dispatcher server is up and the rpm infrastructure |
| 151 | can still function. True otherwise. |
| 152 | """ |
| 153 | for dispatcher_entry in self._dispatcher_minheap: |
| 154 | dispatcher = xmlrpclib.ServerProxy(dispatcher_entry[DISPATCHER_URI]) |
| 155 | try: |
| 156 | if dispatcher.is_up(): |
| 157 | # Atleast one dispatcher is alive so our network is fine. |
| 158 | return False |
| 159 | except socket.error: |
| 160 | # Can't talk to this dispatcher so keep looping. |
| 161 | pass |
| 162 | logging.error("Can't reach any dispatchers. Check frontend network " |
| 163 | 'status or all dispatchers are down.') |
| 164 | return True |
| 165 | |
| 166 | |
| 167 | def _get_dispatcher(self, dut_hostname): |
| 168 | """ |
| 169 | Private method that looks up or assigns a dispatcher server |
| 170 | responsible for communicating with the given DUT's RPM. |
| 171 | |
| 172 | Will also call _check_dispatcher to make sure it is up before returning |
| 173 | it. |
| 174 | |
| 175 | @param dut_hostname: Hostname of the DUT whose dispatcher URI we want |
| 176 | to retrieve. |
| 177 | |
| 178 | @return: URI of dispatcher server responsible for this DUT's rpm. None |
| 179 | if no dispatcher servers are available. |
| 180 | """ |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 181 | if dut_hostname.endswith('servo'): |
| 182 | # Servos are managed by Cisco POE switches. |
| 183 | reload_info = utils.reload_servo_interface_mapping_if_necessary( |
| 184 | self._mapping_last_modified) |
| 185 | if reload_info: |
| 186 | self._mapping_last_modified, self._servo_interface = reload_info |
| 187 | switch_if_tuple = self._servo_interface.get(dut_hostname) |
| 188 | if not switch_if_tuple: |
| 189 | logging.error('Could not determine POE hostname for %s. ' |
| 190 | 'Please check the servo-interface mapping file.', |
| 191 | dut_hostname) |
| 192 | return None |
| 193 | else: |
| 194 | rpm_hostname = switch_if_tuple[0] |
| 195 | else: |
| 196 | # Regular DUTs are managed by RPMs. |
| 197 | rpm_hostname = RPM_REGEX.sub(DEFAULT_RPM_ID, dut_hostname, count=1) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 198 | with self._lock: |
| 199 | if self._rpm_dict.get(rpm_hostname): |
| 200 | return self._rpm_dict[rpm_hostname] |
| 201 | logging.info('No Dispatcher assigned for RPM %s.', rpm_hostname) |
| 202 | # Choose the least loaded dispatcher to communicate with the RPM. |
| 203 | try: |
| 204 | heap_entry = heapq.heappop(self._dispatcher_minheap) |
| 205 | except IndexError: |
| 206 | logging.error('Infrastructure Error: Frontend has no' |
| 207 | 'registered dispatchers to field out this ' |
| 208 | 'request!') |
| 209 | return None |
| 210 | dispatcher_uri = heap_entry[DISPATCHER_URI] |
| 211 | # Put this entry back in the heap with an RPM Count + 1. |
| 212 | heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1 |
| 213 | heapq.heappush(self._dispatcher_minheap, heap_entry) |
| 214 | logging.info('Assigning %s for RPM %s', dispatcher_uri, |
| 215 | rpm_hostname) |
| 216 | self._rpm_dict[rpm_hostname] = dispatcher_uri |
| 217 | return dispatcher_uri |
| 218 | |
| 219 | |
| 220 | def register_dispatcher(self, dispatcher_uri): |
| 221 | """ |
| 222 | Called by a dispatcher server so that the frontend server knows it is |
| 223 | available to field out RPM requests. |
| 224 | |
| 225 | Adds an entry to the min heap and entry map for this dispatcher. |
| 226 | |
| 227 | @param dispatcher_uri: Address of dispatcher server we are registering. |
| 228 | """ |
| 229 | logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri) |
| 230 | with self._lock: |
| 231 | heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri] |
| 232 | heapq.heappush(self._dispatcher_minheap, heap_entry) |
| 233 | self._entry_dict[dispatcher_uri] = heap_entry |
| 234 | |
| 235 | |
| 236 | def unregister_dispatcher(self, uri_to_unregister): |
| 237 | """ |
| 238 | Called by a dispatcher server as it exits so that the frontend server |
| 239 | knows that it is no longer available to field out requests. |
| 240 | |
| 241 | Assigns an rpm count of -1 to this dispatcher so that it will be pushed |
| 242 | out of the min heap. |
| 243 | |
| 244 | Removes from _rpm_dict all entries with the value of this dispatcher so |
| 245 | that those RPM's can be reassigned to a new dispatcher. |
| 246 | |
| 247 | @param uri_to_unregister: Address of dispatcher server we are |
| 248 | unregistering. |
| 249 | """ |
| 250 | logging.info('Unregistering uri: %s as a rpm dispatcher.', |
| 251 | uri_to_unregister) |
| 252 | with self._lock: |
| 253 | heap_entry = self._entry_dict.get(uri_to_unregister) |
| 254 | if not heap_entry: |
| 255 | logging.warning('%s was not registered.', uri_to_unregister) |
| 256 | return |
| 257 | # Set this entry's RPM_COUNT to TERMINATED (-1). |
| 258 | heap_entry[RPM_COUNT] = TERMINATED |
| 259 | # Remove all RPM mappings. |
| 260 | for rpm, dispatcher in self._rpm_dict.items(): |
| 261 | if dispatcher == uri_to_unregister: |
| 262 | self._rpm_dict[rpm] = None |
| 263 | self._entry_dict[uri_to_unregister] = None |
| 264 | # Re-sort the heap and remove any terminated dispatchers. |
| 265 | heapq.heapify(self._dispatcher_minheap) |
| 266 | self._remove_terminated_dispatchers() |
| 267 | |
| 268 | |
| 269 | def _remove_terminated_dispatchers(self): |
| 270 | """ |
| 271 | Peek at the head of the heap and keep popping off values until there is |
| 272 | a non-terminated dispatcher at the top. |
| 273 | """ |
| 274 | # Heapq guarantees the head of the heap is in the '0' index. |
| 275 | try: |
| 276 | # Peek at the next element in the heap. |
| 277 | top_of_heap = self._dispatcher_minheap[0] |
| 278 | while top_of_heap[RPM_COUNT] is TERMINATED: |
| 279 | # Pop off the top element. |
| 280 | heapq.heappop(self._dispatcher_minheap) |
| 281 | # Peek at the next element in the heap. |
| 282 | top_of_heap = self._dispatcher_minheap[0] |
| 283 | except IndexError: |
| 284 | # No more values in the heap. Can be thrown by both minheap[0] |
| 285 | # statements. |
| 286 | pass |
| 287 | |
| 288 | |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame^] | 289 | def suspend_emails(self, hours): |
| 290 | """Suspend email notifications. |
| 291 | |
| 292 | @param hours: How many hours to suspend email notifications. |
| 293 | """ |
| 294 | if self._email_handler: |
| 295 | self._email_handler.suspend_emails(hours) |
| 296 | |
| 297 | |
| 298 | def resume_emails(self): |
| 299 | """Resume email notifications.""" |
| 300 | if self._email_handler: |
| 301 | self._email_handler.resume_emails() |
| 302 | |
| 303 | |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 304 | if __name__ == '__main__': |
| 305 | """ |
| 306 | Main function used to launch the frontend server. Creates an instance of |
| 307 | RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance. |
| 308 | """ |
Scott Zawalski | 201d6be | 2012-09-21 15:56:25 -0400 | [diff] [blame] | 309 | if len(sys.argv) > 1: |
| 310 | print 'Usage: ./%s, no arguments available.' % sys.argv[0] |
| 311 | sys.exit(1) |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame^] | 312 | email_handler = rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT) |
| 313 | frontend_server = RPMFrontendServer(email_handler=email_handler) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 314 | address = rpm_config.get('RPM_INFRASTRUCTURE', 'frontend_addr') |
| 315 | port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port') |
| 316 | server = MultiThreadedXMLRPCServer((address, port), allow_none=True) |
| 317 | server.register_instance(frontend_server) |
| 318 | logging.info('Listening on %s port %d', address, port) |
| 319 | server.serve_forever() |