Scott Zawalski | 201d6be | 2012-09-21 15:56:25 -0400 | [diff] [blame] | 1 | #!/usr/bin/python |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 2 | # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| 3 | # Use of this source code is governed by a BSD-style license that can be |
| 4 | # found in the LICENSE file. |
| 5 | |
| 6 | import errno |
| 7 | import heapq |
| 8 | import logging |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 9 | import os |
Scott Zawalski | 201d6be | 2012-09-21 15:56:25 -0400 | [diff] [blame] | 10 | import sys |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 11 | import socket |
| 12 | import threading |
| 13 | import xmlrpclib |
| 14 | |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 15 | import rpm_logging_config |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 16 | from config import rpm_config |
| 17 | from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer |
| 18 | from rpm_infrastructure_exception import RPMInfrastructureException |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 19 | |
Dan Shi | b7610b5 | 2014-05-06 11:09:49 -0700 | [diff] [blame] | 20 | import common |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 21 | from autotest_lib.server import frontend |
Dan Shi | b7610b5 | 2014-05-06 11:09:49 -0700 | [diff] [blame] | 22 | from autotest_lib.site_utils.rpm_control_system import utils |
| 23 | |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 24 | DEFAULT_RPM_COUNT = 0 |
| 25 | TERMINATED = -1 |
| 26 | |
| 27 | # Indexes for accessing heap entries. |
| 28 | RPM_COUNT = 0 |
| 29 | DISPATCHER_URI = 1 |
| 30 | |
| 31 | LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format') |
| 32 | DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id') |
| 33 | |
| 34 | # Valid state values. |
| 35 | VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE'] |
| 36 | |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 37 | # Servo-interface mapping file |
Fang Deng | 5e4e46d | 2013-06-19 13:57:08 -0700 | [diff] [blame] | 38 | MAPPING_FILE = os.path.join( |
| 39 | os.path.dirname(__file__), |
| 40 | rpm_config.get('CiscoPOE', 'servo_interface_mapping_file')) |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 41 | |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 42 | # Size of the LRU that holds power management unit information related |
| 43 | # to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc. |
| 44 | LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size') |
| 45 | |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 46 | |
Prathmesh Prabhu | bc41d1c | 2018-07-12 17:50:49 -0700 | [diff] [blame] | 47 | class DispatcherDownException(Exception): |
| 48 | """Raised when a particular RPMDispatcher is down.""" |
| 49 | |
| 50 | |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 51 | class RPMFrontendServer(object): |
| 52 | """ |
| 53 | This class is the frontend server of the RPM Infrastructure. All clients |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 54 | will send their power state requests to this central server who will |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 55 | forward the requests to an avaliable or already assigned RPM dispatcher |
| 56 | server. |
| 57 | |
| 58 | Once the dispatcher processes the request it will return the result |
| 59 | to this frontend server who will send the result back to the client. |
| 60 | |
| 61 | All calls to this server are blocking. |
| 62 | |
| 63 | @var _dispatcher_minheap: Min heap that returns a list of format- |
| 64 | [ num_rpm's, dispatcher_uri ] |
| 65 | Used to choose the least loaded dispatcher. |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 66 | @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 67 | heap. If a dispatcher server shuts down this allows us to |
| 68 | invalidate the entry in the minheap. |
| 69 | @var _lock: Used to protect data from multiple running threads all |
| 70 | manipulating the same data. |
| 71 | @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher |
| 72 | server. |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 73 | @var _mapping_last_modified: Last-modified time of the servo-interface |
| 74 | mapping file. |
| 75 | @var _servo_interface: Maps servo hostname to (switch_hostname, interface). |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 76 | @var _rpm_info: An LRU cache to hold recently visited rpm information |
| 77 | so that we don't hit AFE too often. The elements in |
| 78 | the cache are instances of PowerUnitInfo indexed by |
| 79 | dut hostnames. POE info is not stored in the cache. |
| 80 | @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname. |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame] | 81 | @var _email_handler: Email handler to use to control email notifications. |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 82 | """ |
| 83 | |
| 84 | |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame] | 85 | def __init__(self, email_handler=None): |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 86 | """ |
| 87 | RPMFrontendServer constructor. |
| 88 | |
| 89 | Initializes instance variables. |
| 90 | """ |
| 91 | self._dispatcher_minheap = [] |
| 92 | self._entry_dict = {} |
| 93 | self._lock = threading.Lock() |
Fang Deng | 71c4b1f | 2013-05-20 09:55:04 -0700 | [diff] [blame] | 94 | self._mapping_last_modified = os.path.getmtime(MAPPING_FILE) |
| 95 | self._servo_interface = utils.load_servo_interface_mapping() |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 96 | self._rpm_dict = {} |
| 97 | self._afe = frontend.AFE() |
| 98 | self._rpm_info = utils.LRUCache(size=LRU_SIZE) |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame] | 99 | self._email_handler = email_handler |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 100 | |
| 101 | |
Prathmesh Prabhu | 0a931ca | 2018-07-12 17:58:33 -0700 | [diff] [blame^] | 102 | def set_power_via_poe(self, device_hostname, new_state): |
| 103 | """Sets power state of the device to the requested state via POE. |
| 104 | |
| 105 | @param device_hostname: Hostname of the servo to control. |
| 106 | @param new_state: [ON, OFF, CYCLE] State to which we want to set the |
| 107 | device's outlet to. |
| 108 | |
| 109 | @return: True if the attempt to change power state was successful, |
| 110 | False otherwise. |
| 111 | |
| 112 | @raise RPMInfrastructureException: No dispatchers are available or can |
| 113 | be reached. |
| 114 | """ |
| 115 | # Remove any DNS Zone information and simplify down to just the hostname. |
| 116 | device_hostname = device_hostname.split('.')[0] |
| 117 | new_state = new_state.upper() |
| 118 | if new_state not in VALID_STATE_VALUES: |
| 119 | logging.error('Received request to set servo %s to invalid ' |
| 120 | 'state %s', device_hostname, new_state) |
| 121 | return False |
| 122 | logging.info('Received request to set servo: %s to state: %s', |
| 123 | device_hostname, new_state) |
| 124 | powerunit_info = self._get_poe_powerunit_info(device_hostname) |
| 125 | try: |
| 126 | return self._queue_once(powerunit_info, new_state) |
| 127 | except DispatcherDownException: |
| 128 | # Retry forwarding the request. |
| 129 | return self.set_power_via_poe(device_hostname, new_state) |
| 130 | |
| 131 | |
| 132 | def set_power_via_rpm(self, device_hostname, rpm_hostname, |
| 133 | rpm_outlet, hydra_hostname, new_state): |
| 134 | """Sets power state of a device to the requested state via RPM. |
| 135 | |
| 136 | Unlike the special case of POE, powerunit information is not available |
| 137 | on the RPM server, so must be provided as arguments. |
| 138 | |
| 139 | @param device_hostname: Hostname of the servo to control. |
| 140 | @param rpm_hostname: Hostname of the RPM to use. |
| 141 | @param rpm_outlet: The RPM outlet to control. |
| 142 | @param hydra_hostname: If required, the hydra device to SSH through to |
| 143 | get to the RPM. |
| 144 | @param new_state: [ON, OFF, CYCLE] State to which we want to set the |
| 145 | device's outlet to. |
| 146 | |
| 147 | @return: True if the attempt to change power state was successful, |
| 148 | False otherwise. |
| 149 | |
| 150 | @raise RPMInfrastructureException: No dispatchers are available or can |
| 151 | be reached. |
| 152 | """ |
| 153 | powerunit_info = utils.PowerUnitInfo( |
| 154 | device_hostname=device_hostname, |
| 155 | powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM, |
| 156 | powerunit_hostname=rpm_hostname, |
| 157 | outlet=rpm_outlet, |
| 158 | hydra_hostname=hydra_hostname, |
| 159 | ) |
| 160 | try: |
| 161 | return self._queue_once(powerunit_info, new_state) |
| 162 | except DispatcherDownException: |
| 163 | # Retry forwarding the request. |
| 164 | return self.set_power_via_rpm(device_hostname, rpm_hostname, |
| 165 | rpm_outlet, hydra_hostname, new_state) |
| 166 | |
| 167 | |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 168 | def queue_request(self, device_hostname, new_state): |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 169 | """ |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 170 | Forwards a request to change a device's (a dut or a servo) power state |
| 171 | to the appropriate dispatcher server. |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 172 | |
| 173 | This call will block until the forwarded request returns. |
| 174 | |
Prathmesh Prabhu | bc41d1c | 2018-07-12 17:50:49 -0700 | [diff] [blame] | 175 | @param device_hostname: Hostname of the device whose power state we want |
| 176 | to change. |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 177 | @param new_state: [ON, OFF, CYCLE] State to which we want to set the |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 178 | device's outlet to. |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 179 | |
| 180 | @return: True if the attempt to change power state was successful, |
| 181 | False otherwise. |
| 182 | |
| 183 | @raise RPMInfrastructureException: No dispatchers are available or can |
| 184 | be reached. |
| 185 | """ |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 186 | # Remove any DNS Zone information and simplify down to just the hostname. |
| 187 | device_hostname = device_hostname.split('.')[0] |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 188 | new_state = new_state.upper() |
Simran Basi | d122264 | 2012-09-24 15:31:58 -0700 | [diff] [blame] | 189 | # Put new_state in all uppercase letters |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 190 | if new_state not in VALID_STATE_VALUES: |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 191 | logging.error('Received request to set device %s to invalid ' |
| 192 | 'state %s', device_hostname, new_state) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 193 | return False |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 194 | logging.info('Received request to set device: %s to state: %s', |
| 195 | device_hostname, new_state) |
| 196 | powerunit_info = self._get_powerunit_info(device_hostname) |
Prathmesh Prabhu | bc41d1c | 2018-07-12 17:50:49 -0700 | [diff] [blame] | 197 | try: |
| 198 | return self._queue_once(powerunit_info, new_state) |
| 199 | except DispatcherDownException: |
| 200 | # Retry forwarding the request. |
| 201 | return self.queue_request(device_hostname, new_state) |
| 202 | |
| 203 | |
| 204 | def _queue_once(self, powerunit_info, new_state): |
| 205 | """Queue one request to the dispatcher.""" |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 206 | dispatcher_uri = self._get_dispatcher(powerunit_info) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 207 | if not dispatcher_uri: |
| 208 | # No dispatchers available. |
| 209 | raise RPMInfrastructureException('No dispatchers available.') |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 210 | client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 211 | try: |
| 212 | # Block on the request and return the result once it arrives. |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 213 | return client.queue_request(powerunit_info, new_state) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 214 | except socket.error as er: |
| 215 | # Dispatcher Server is not reachable. Unregister it and retry. |
| 216 | logging.error("Can't reach Dispatch Server: %s. Error: %s", |
| 217 | dispatcher_uri, errno.errorcode[er.errno]) |
| 218 | if self.is_network_infrastructure_down(): |
| 219 | # No dispatchers can handle this request so raise an Exception |
| 220 | # to the caller. |
| 221 | raise RPMInfrastructureException('No dispatchers can be' |
| 222 | 'reached.') |
| 223 | logging.info('Will attempt forwarding request to other dispatch ' |
| 224 | 'servers.') |
| 225 | logging.error('Unregistering %s due to error. Recommend resetting ' |
| 226 | 'that dispatch server.', dispatcher_uri) |
| 227 | self.unregister_dispatcher(dispatcher_uri) |
Prathmesh Prabhu | bc41d1c | 2018-07-12 17:50:49 -0700 | [diff] [blame] | 228 | raise DispatcherDownException(dispatcher_uri) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 229 | |
| 230 | |
| 231 | def is_network_infrastructure_down(self): |
| 232 | """ |
| 233 | Check to see if we can communicate with any dispatcher servers. |
| 234 | |
| 235 | Only called in the situation that queuing a request to a dispatcher |
| 236 | server failed. |
| 237 | |
| 238 | @return: False if any dispatcher server is up and the rpm infrastructure |
| 239 | can still function. True otherwise. |
| 240 | """ |
| 241 | for dispatcher_entry in self._dispatcher_minheap: |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 242 | dispatcher = xmlrpclib.ServerProxy( |
| 243 | dispatcher_entry[DISPATCHER_URI], allow_none=True) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 244 | try: |
| 245 | if dispatcher.is_up(): |
| 246 | # Atleast one dispatcher is alive so our network is fine. |
| 247 | return False |
| 248 | except socket.error: |
| 249 | # Can't talk to this dispatcher so keep looping. |
| 250 | pass |
| 251 | logging.error("Can't reach any dispatchers. Check frontend network " |
| 252 | 'status or all dispatchers are down.') |
| 253 | return True |
| 254 | |
| 255 | |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 256 | def _get_powerunit_info(self, device_hostname): |
| 257 | """Get the power management unit information for a device. |
| 258 | |
| 259 | A device could be a chromeos dut or a servo. |
| 260 | 1) ChromeOS dut |
| 261 | Chromeos dut is managed by RPM. The related information |
| 262 | we need to know include rpm hostname, rpm outlet, hydra hostname. |
| 263 | Such information can be retrieved from afe_host_attributes table |
| 264 | from afe. A local LRU cache is used avoid hitting afe too often. |
| 265 | |
| 266 | 2) Servo |
| 267 | Servo is managed by POE. The related information we need to know |
| 268 | include poe hostname, poe interface. Such information is |
| 269 | stored in a local file and read into memory. |
| 270 | |
| 271 | @param device_hostname: A string representing the device's hostname. |
| 272 | |
| 273 | @returns: A PowerUnitInfo object. |
| 274 | @raises RPMInfrastructureException if failed to get the power |
| 275 | unit info. |
| 276 | |
| 277 | """ |
Prathmesh Prabhu | c32b493 | 2018-07-12 17:43:30 -0700 | [diff] [blame] | 278 | if device_hostname.endswith('servo'): |
| 279 | return self._get_poe_powerunit_info(device_hostname) |
| 280 | else: |
| 281 | return self._get_rpm_powerunit_info(device_hostname) |
| 282 | |
| 283 | |
| 284 | def _get_poe_powerunit_info(self, device_hostname): |
| 285 | """Get the power management unit information for a POE controller. |
| 286 | |
| 287 | Servo is managed by POE. The related information we need to know |
| 288 | include poe hostname, poe interface. Such information is |
| 289 | stored in a local file and read into memory. |
| 290 | |
| 291 | @param device_hostname: A string representing the device's hostname. |
| 292 | |
| 293 | @returns: A PowerUnitInfo object. |
| 294 | @raises RPMInfrastructureException if failed to get the power |
| 295 | unit info. |
| 296 | |
| 297 | """ |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 298 | with self._lock: |
Prathmesh Prabhu | c32b493 | 2018-07-12 17:43:30 -0700 | [diff] [blame] | 299 | reload_info = utils.reload_servo_interface_mapping_if_necessary( |
| 300 | self._mapping_last_modified) |
| 301 | if reload_info: |
| 302 | self._mapping_last_modified, self._servo_interface = reload_info |
| 303 | switch_if_tuple = self._servo_interface.get(device_hostname) |
| 304 | if not switch_if_tuple: |
| 305 | raise RPMInfrastructureException( |
| 306 | 'Could not determine POE hostname for %s. ' |
| 307 | 'Please check the servo-interface mapping file.', |
| 308 | device_hostname) |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 309 | else: |
Prathmesh Prabhu | c32b493 | 2018-07-12 17:43:30 -0700 | [diff] [blame] | 310 | return utils.PowerUnitInfo( |
| 311 | device_hostname=device_hostname, |
| 312 | powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE, |
| 313 | powerunit_hostname=switch_if_tuple[0], |
| 314 | outlet=switch_if_tuple[1], |
| 315 | hydra_hostname=None) |
| 316 | |
| 317 | |
| 318 | |
| 319 | def _get_rpm_powerunit_info(self, device_hostname): |
| 320 | """Get the power management unit information for an RPM controller. |
| 321 | |
| 322 | Chromeos dut is managed by RPM. The related information |
| 323 | we need to know include rpm hostname, rpm outlet, hydra hostname. |
| 324 | Such information can be retrieved from afe_host_attributes table |
| 325 | from afe. A local LRU cache is used avoid hitting afe too often. |
| 326 | |
| 327 | @param device_hostname: A string representing the device's hostname. |
| 328 | |
| 329 | @returns: A PowerUnitInfo object. |
| 330 | @raises RPMInfrastructureException if failed to get the power |
| 331 | unit info. |
| 332 | |
| 333 | """ |
| 334 | with self._lock: |
| 335 | # Regular DUTs are managed by RPMs. |
| 336 | if device_hostname in self._rpm_info: |
| 337 | return self._rpm_info[device_hostname] |
| 338 | else: |
| 339 | hosts = self._afe.get_hosts(hostname=device_hostname) |
| 340 | if not hosts: |
| 341 | raise RPMInfrastructureException( |
| 342 | 'Can not retrieve rpm information ' |
| 343 | 'from AFE for %s, no host found.' % device_hostname) |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 344 | else: |
Prathmesh Prabhu | c32b493 | 2018-07-12 17:43:30 -0700 | [diff] [blame] | 345 | info = utils.PowerUnitInfo.get_powerunit_info(hosts[0]) |
| 346 | self._rpm_info[device_hostname] = info |
| 347 | return info |
| 348 | |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 349 | |
| 350 | |
| 351 | def _get_dispatcher(self, powerunit_info): |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 352 | """ |
| 353 | Private method that looks up or assigns a dispatcher server |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 354 | responsible for communicating with the given RPM/POE. |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 355 | |
| 356 | Will also call _check_dispatcher to make sure it is up before returning |
| 357 | it. |
| 358 | |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 359 | @param powerunit_info: A PowerUnitInfo instance. |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 360 | |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 361 | @return: URI of dispatcher server responsible for the rpm/poe. |
| 362 | None if no dispatcher servers are available. |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 363 | """ |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 364 | powerunit_type = powerunit_info.powerunit_type |
| 365 | powerunit_hostname = powerunit_info.powerunit_hostname |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 366 | with self._lock: |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 367 | if self._rpm_dict.get(powerunit_hostname): |
| 368 | return self._rpm_dict[powerunit_hostname] |
| 369 | logging.info('No Dispatcher assigned for %s %s.', |
| 370 | powerunit_type, powerunit_hostname) |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 371 | # Choose the least loaded dispatcher to communicate with the RPM. |
| 372 | try: |
| 373 | heap_entry = heapq.heappop(self._dispatcher_minheap) |
| 374 | except IndexError: |
| 375 | logging.error('Infrastructure Error: Frontend has no' |
| 376 | 'registered dispatchers to field out this ' |
| 377 | 'request!') |
| 378 | return None |
| 379 | dispatcher_uri = heap_entry[DISPATCHER_URI] |
| 380 | # Put this entry back in the heap with an RPM Count + 1. |
| 381 | heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1 |
| 382 | heapq.heappush(self._dispatcher_minheap, heap_entry) |
Fang Deng | f63c978 | 2014-08-13 17:08:19 -0700 | [diff] [blame] | 383 | logging.info('Assigning %s for %s %s', dispatcher_uri, |
| 384 | powerunit_type, powerunit_hostname) |
| 385 | self._rpm_dict[powerunit_hostname] = dispatcher_uri |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 386 | return dispatcher_uri |
| 387 | |
| 388 | |
| 389 | def register_dispatcher(self, dispatcher_uri): |
| 390 | """ |
| 391 | Called by a dispatcher server so that the frontend server knows it is |
| 392 | available to field out RPM requests. |
| 393 | |
| 394 | Adds an entry to the min heap and entry map for this dispatcher. |
| 395 | |
| 396 | @param dispatcher_uri: Address of dispatcher server we are registering. |
| 397 | """ |
| 398 | logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri) |
| 399 | with self._lock: |
| 400 | heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri] |
| 401 | heapq.heappush(self._dispatcher_minheap, heap_entry) |
| 402 | self._entry_dict[dispatcher_uri] = heap_entry |
| 403 | |
| 404 | |
| 405 | def unregister_dispatcher(self, uri_to_unregister): |
| 406 | """ |
| 407 | Called by a dispatcher server as it exits so that the frontend server |
| 408 | knows that it is no longer available to field out requests. |
| 409 | |
| 410 | Assigns an rpm count of -1 to this dispatcher so that it will be pushed |
| 411 | out of the min heap. |
| 412 | |
| 413 | Removes from _rpm_dict all entries with the value of this dispatcher so |
| 414 | that those RPM's can be reassigned to a new dispatcher. |
| 415 | |
| 416 | @param uri_to_unregister: Address of dispatcher server we are |
| 417 | unregistering. |
| 418 | """ |
| 419 | logging.info('Unregistering uri: %s as a rpm dispatcher.', |
| 420 | uri_to_unregister) |
| 421 | with self._lock: |
| 422 | heap_entry = self._entry_dict.get(uri_to_unregister) |
| 423 | if not heap_entry: |
| 424 | logging.warning('%s was not registered.', uri_to_unregister) |
| 425 | return |
| 426 | # Set this entry's RPM_COUNT to TERMINATED (-1). |
| 427 | heap_entry[RPM_COUNT] = TERMINATED |
| 428 | # Remove all RPM mappings. |
| 429 | for rpm, dispatcher in self._rpm_dict.items(): |
| 430 | if dispatcher == uri_to_unregister: |
| 431 | self._rpm_dict[rpm] = None |
| 432 | self._entry_dict[uri_to_unregister] = None |
| 433 | # Re-sort the heap and remove any terminated dispatchers. |
| 434 | heapq.heapify(self._dispatcher_minheap) |
| 435 | self._remove_terminated_dispatchers() |
| 436 | |
| 437 | |
| 438 | def _remove_terminated_dispatchers(self): |
| 439 | """ |
| 440 | Peek at the head of the heap and keep popping off values until there is |
| 441 | a non-terminated dispatcher at the top. |
| 442 | """ |
| 443 | # Heapq guarantees the head of the heap is in the '0' index. |
| 444 | try: |
| 445 | # Peek at the next element in the heap. |
| 446 | top_of_heap = self._dispatcher_minheap[0] |
| 447 | while top_of_heap[RPM_COUNT] is TERMINATED: |
| 448 | # Pop off the top element. |
| 449 | heapq.heappop(self._dispatcher_minheap) |
| 450 | # Peek at the next element in the heap. |
| 451 | top_of_heap = self._dispatcher_minheap[0] |
| 452 | except IndexError: |
| 453 | # No more values in the heap. Can be thrown by both minheap[0] |
| 454 | # statements. |
| 455 | pass |
| 456 | |
| 457 | |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame] | 458 | def suspend_emails(self, hours): |
| 459 | """Suspend email notifications. |
| 460 | |
| 461 | @param hours: How many hours to suspend email notifications. |
| 462 | """ |
| 463 | if self._email_handler: |
| 464 | self._email_handler.suspend_emails(hours) |
| 465 | |
| 466 | |
| 467 | def resume_emails(self): |
| 468 | """Resume email notifications.""" |
| 469 | if self._email_handler: |
| 470 | self._email_handler.resume_emails() |
| 471 | |
| 472 | |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 473 | if __name__ == '__main__': |
| 474 | """ |
| 475 | Main function used to launch the frontend server. Creates an instance of |
| 476 | RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance. |
| 477 | """ |
Prathmesh Prabhu | 90b0c95 | 2017-09-15 16:11:12 -0700 | [diff] [blame] | 478 | if len(sys.argv) != 2: |
| 479 | print 'Usage: ./%s <log_file_dir>.' % sys.argv[0] |
Scott Zawalski | 201d6be | 2012-09-21 15:56:25 -0400 | [diff] [blame] | 480 | sys.exit(1) |
Prathmesh Prabhu | 90b0c95 | 2017-09-15 16:11:12 -0700 | [diff] [blame] | 481 | |
| 482 | email_handler = rpm_logging_config.set_up_logging_to_file( |
| 483 | sys.argv[1], LOG_FILENAME_FORMAT) |
Simran Basi | 4e3d118 | 2013-06-25 16:12:30 -0700 | [diff] [blame] | 484 | frontend_server = RPMFrontendServer(email_handler=email_handler) |
Prathmesh Prabhu | e315a96 | 2017-09-20 14:49:49 -0700 | [diff] [blame] | 485 | # We assume that external clients will always connect to us via the |
| 486 | # hostname, so listening on the hostname ensures we pick the right network |
| 487 | # interface. |
| 488 | address = socket.gethostname() |
Simran Basi | 7498d20 | 2012-07-10 15:21:28 -0700 | [diff] [blame] | 489 | port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port') |
| 490 | server = MultiThreadedXMLRPCServer((address, port), allow_none=True) |
| 491 | server.register_instance(frontend_server) |
| 492 | logging.info('Listening on %s port %d', address, port) |
| 493 | server.serve_forever() |