blob: bb5d33e065157623b968f2f5fbc03caef3babd50 [file] [log] [blame]
Mike Frysingerd03e6b52019-08-03 12:49:01 -04001#!/usr/bin/python2
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Scott Zawalski201d6be2012-09-21 15:56:25 -040010import sys
Simran Basi7498d202012-07-10 15:21:28 -070011import socket
12import threading
13import xmlrpclib
14
Fang Deng71c4b1f2013-05-20 09:55:04 -070015import rpm_logging_config
Simran Basi7498d202012-07-10 15:21:28 -070016from config import rpm_config
17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
18from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070019
Dan Shib7610b52014-05-06 11:09:49 -070020import common
21from autotest_lib.site_utils.rpm_control_system import utils
22
Simran Basi7498d202012-07-10 15:21:28 -070023DEFAULT_RPM_COUNT = 0
24TERMINATED = -1
25
26# Indexes for accessing heap entries.
27RPM_COUNT = 0
28DISPATCHER_URI = 1
29
30LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
31DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
32
33# Valid state values.
34VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
35
Fang Deng71c4b1f2013-05-20 09:55:04 -070036# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070037MAPPING_FILE = os.path.join(
38 os.path.dirname(__file__),
39 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070040
Fang Dengf63c9782014-08-13 17:08:19 -070041# Size of the LRU that holds power management unit information related
42# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
43LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
44
Simran Basi7498d202012-07-10 15:21:28 -070045
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -070046class DispatcherDownException(Exception):
47 """Raised when a particular RPMDispatcher is down."""
48
49
Simran Basi7498d202012-07-10 15:21:28 -070050class RPMFrontendServer(object):
51 """
52 This class is the frontend server of the RPM Infrastructure. All clients
Fang Dengf63c9782014-08-13 17:08:19 -070053 will send their power state requests to this central server who will
Simran Basi7498d202012-07-10 15:21:28 -070054 forward the requests to an avaliable or already assigned RPM dispatcher
55 server.
56
57 Once the dispatcher processes the request it will return the result
58 to this frontend server who will send the result back to the client.
59
60 All calls to this server are blocking.
61
62 @var _dispatcher_minheap: Min heap that returns a list of format-
63 [ num_rpm's, dispatcher_uri ]
64 Used to choose the least loaded dispatcher.
Fang Deng71c4b1f2013-05-20 09:55:04 -070065 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
Simran Basi7498d202012-07-10 15:21:28 -070066 heap. If a dispatcher server shuts down this allows us to
67 invalidate the entry in the minheap.
68 @var _lock: Used to protect data from multiple running threads all
69 manipulating the same data.
70 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
71 server.
Fang Deng71c4b1f2013-05-20 09:55:04 -070072 @var _mapping_last_modified: Last-modified time of the servo-interface
73 mapping file.
74 @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
Fang Dengf63c9782014-08-13 17:08:19 -070075 @var _rpm_info: An LRU cache to hold recently visited rpm information
76 so that we don't hit AFE too often. The elements in
77 the cache are instances of PowerUnitInfo indexed by
78 dut hostnames. POE info is not stored in the cache.
79 @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
Simran Basi4e3d1182013-06-25 16:12:30 -070080 @var _email_handler: Email handler to use to control email notifications.
Simran Basi7498d202012-07-10 15:21:28 -070081 """
82
83
Simran Basi4e3d1182013-06-25 16:12:30 -070084 def __init__(self, email_handler=None):
Simran Basi7498d202012-07-10 15:21:28 -070085 """
86 RPMFrontendServer constructor.
87
88 Initializes instance variables.
89 """
90 self._dispatcher_minheap = []
91 self._entry_dict = {}
92 self._lock = threading.Lock()
Fang Deng71c4b1f2013-05-20 09:55:04 -070093 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
94 self._servo_interface = utils.load_servo_interface_mapping()
Fang Dengf63c9782014-08-13 17:08:19 -070095 self._rpm_dict = {}
Fang Dengf63c9782014-08-13 17:08:19 -070096 self._rpm_info = utils.LRUCache(size=LRU_SIZE)
Simran Basi4e3d1182013-06-25 16:12:30 -070097 self._email_handler = email_handler
Simran Basi7498d202012-07-10 15:21:28 -070098
99
Prathmesh Prabhu0a931ca2018-07-12 17:58:33 -0700100 def set_power_via_poe(self, device_hostname, new_state):
101 """Sets power state of the device to the requested state via POE.
102
103 @param device_hostname: Hostname of the servo to control.
104 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
105 device's outlet to.
106
107 @return: True if the attempt to change power state was successful,
108 False otherwise.
109
110 @raise RPMInfrastructureException: No dispatchers are available or can
111 be reached.
112 """
113 # Remove any DNS Zone information and simplify down to just the hostname.
114 device_hostname = device_hostname.split('.')[0]
115 new_state = new_state.upper()
116 if new_state not in VALID_STATE_VALUES:
117 logging.error('Received request to set servo %s to invalid '
118 'state %s', device_hostname, new_state)
119 return False
120 logging.info('Received request to set servo: %s to state: %s',
121 device_hostname, new_state)
122 powerunit_info = self._get_poe_powerunit_info(device_hostname)
123 try:
124 return self._queue_once(powerunit_info, new_state)
125 except DispatcherDownException:
126 # Retry forwarding the request.
127 return self.set_power_via_poe(device_hostname, new_state)
128
129
130 def set_power_via_rpm(self, device_hostname, rpm_hostname,
131 rpm_outlet, hydra_hostname, new_state):
132 """Sets power state of a device to the requested state via RPM.
133
134 Unlike the special case of POE, powerunit information is not available
135 on the RPM server, so must be provided as arguments.
136
137 @param device_hostname: Hostname of the servo to control.
138 @param rpm_hostname: Hostname of the RPM to use.
139 @param rpm_outlet: The RPM outlet to control.
140 @param hydra_hostname: If required, the hydra device to SSH through to
141 get to the RPM.
142 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
143 device's outlet to.
144
145 @return: True if the attempt to change power state was successful,
146 False otherwise.
147
148 @raise RPMInfrastructureException: No dispatchers are available or can
149 be reached.
150 """
Garry Wang84281e52019-05-14 15:00:13 -0700151 new_state = new_state.upper()
152 if new_state not in VALID_STATE_VALUES:
153 logging.error('Received request to set device %s to invalid '
154 'state %s', device_hostname, new_state)
155 return False
156 logging.info('Received request to set device: %s to state: %s',
157 device_hostname, new_state)
158
Prathmesh Prabhu0a931ca2018-07-12 17:58:33 -0700159 powerunit_info = utils.PowerUnitInfo(
160 device_hostname=device_hostname,
161 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM,
162 powerunit_hostname=rpm_hostname,
163 outlet=rpm_outlet,
164 hydra_hostname=hydra_hostname,
165 )
166 try:
167 return self._queue_once(powerunit_info, new_state)
168 except DispatcherDownException:
169 # Retry forwarding the request.
170 return self.set_power_via_rpm(device_hostname, rpm_hostname,
171 rpm_outlet, hydra_hostname, new_state)
172
173
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700174 def _queue_once(self, powerunit_info, new_state):
175 """Queue one request to the dispatcher."""
Fang Dengf63c9782014-08-13 17:08:19 -0700176 dispatcher_uri = self._get_dispatcher(powerunit_info)
Simran Basi7498d202012-07-10 15:21:28 -0700177 if not dispatcher_uri:
178 # No dispatchers available.
179 raise RPMInfrastructureException('No dispatchers available.')
Fang Dengf63c9782014-08-13 17:08:19 -0700180 client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700181 try:
182 # Block on the request and return the result once it arrives.
Fang Dengf63c9782014-08-13 17:08:19 -0700183 return client.queue_request(powerunit_info, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700184 except socket.error as er:
185 # Dispatcher Server is not reachable. Unregister it and retry.
186 logging.error("Can't reach Dispatch Server: %s. Error: %s",
187 dispatcher_uri, errno.errorcode[er.errno])
188 if self.is_network_infrastructure_down():
189 # No dispatchers can handle this request so raise an Exception
190 # to the caller.
191 raise RPMInfrastructureException('No dispatchers can be'
192 'reached.')
193 logging.info('Will attempt forwarding request to other dispatch '
194 'servers.')
195 logging.error('Unregistering %s due to error. Recommend resetting '
196 'that dispatch server.', dispatcher_uri)
197 self.unregister_dispatcher(dispatcher_uri)
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700198 raise DispatcherDownException(dispatcher_uri)
Simran Basi7498d202012-07-10 15:21:28 -0700199
200
201 def is_network_infrastructure_down(self):
202 """
203 Check to see if we can communicate with any dispatcher servers.
204
205 Only called in the situation that queuing a request to a dispatcher
206 server failed.
207
208 @return: False if any dispatcher server is up and the rpm infrastructure
209 can still function. True otherwise.
210 """
211 for dispatcher_entry in self._dispatcher_minheap:
Fang Dengf63c9782014-08-13 17:08:19 -0700212 dispatcher = xmlrpclib.ServerProxy(
213 dispatcher_entry[DISPATCHER_URI], allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700214 try:
215 if dispatcher.is_up():
216 # Atleast one dispatcher is alive so our network is fine.
217 return False
218 except socket.error:
219 # Can't talk to this dispatcher so keep looping.
220 pass
221 logging.error("Can't reach any dispatchers. Check frontend network "
222 'status or all dispatchers are down.')
223 return True
224
225
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700226 def _get_poe_powerunit_info(self, device_hostname):
227 """Get the power management unit information for a POE controller.
228
229 Servo is managed by POE. The related information we need to know
230 include poe hostname, poe interface. Such information is
231 stored in a local file and read into memory.
232
233 @param device_hostname: A string representing the device's hostname.
234
235 @returns: A PowerUnitInfo object.
236 @raises RPMInfrastructureException if failed to get the power
237 unit info.
238
239 """
Fang Dengf63c9782014-08-13 17:08:19 -0700240 with self._lock:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700241 reload_info = utils.reload_servo_interface_mapping_if_necessary(
242 self._mapping_last_modified)
243 if reload_info:
244 self._mapping_last_modified, self._servo_interface = reload_info
245 switch_if_tuple = self._servo_interface.get(device_hostname)
246 if not switch_if_tuple:
247 raise RPMInfrastructureException(
248 'Could not determine POE hostname for %s. '
249 'Please check the servo-interface mapping file.',
250 device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700251 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700252 return utils.PowerUnitInfo(
253 device_hostname=device_hostname,
254 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
255 powerunit_hostname=switch_if_tuple[0],
256 outlet=switch_if_tuple[1],
257 hydra_hostname=None)
258
259
Fang Dengf63c9782014-08-13 17:08:19 -0700260 def _get_dispatcher(self, powerunit_info):
Simran Basi7498d202012-07-10 15:21:28 -0700261 """
262 Private method that looks up or assigns a dispatcher server
Fang Dengf63c9782014-08-13 17:08:19 -0700263 responsible for communicating with the given RPM/POE.
Simran Basi7498d202012-07-10 15:21:28 -0700264
265 Will also call _check_dispatcher to make sure it is up before returning
266 it.
267
Fang Dengf63c9782014-08-13 17:08:19 -0700268 @param powerunit_info: A PowerUnitInfo instance.
Simran Basi7498d202012-07-10 15:21:28 -0700269
Fang Dengf63c9782014-08-13 17:08:19 -0700270 @return: URI of dispatcher server responsible for the rpm/poe.
271 None if no dispatcher servers are available.
Simran Basi7498d202012-07-10 15:21:28 -0700272 """
Fang Dengf63c9782014-08-13 17:08:19 -0700273 powerunit_type = powerunit_info.powerunit_type
274 powerunit_hostname = powerunit_info.powerunit_hostname
Simran Basi7498d202012-07-10 15:21:28 -0700275 with self._lock:
Fang Dengf63c9782014-08-13 17:08:19 -0700276 if self._rpm_dict.get(powerunit_hostname):
277 return self._rpm_dict[powerunit_hostname]
278 logging.info('No Dispatcher assigned for %s %s.',
279 powerunit_type, powerunit_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700280 # Choose the least loaded dispatcher to communicate with the RPM.
281 try:
282 heap_entry = heapq.heappop(self._dispatcher_minheap)
283 except IndexError:
284 logging.error('Infrastructure Error: Frontend has no'
285 'registered dispatchers to field out this '
286 'request!')
287 return None
288 dispatcher_uri = heap_entry[DISPATCHER_URI]
289 # Put this entry back in the heap with an RPM Count + 1.
290 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
291 heapq.heappush(self._dispatcher_minheap, heap_entry)
Fang Dengf63c9782014-08-13 17:08:19 -0700292 logging.info('Assigning %s for %s %s', dispatcher_uri,
293 powerunit_type, powerunit_hostname)
294 self._rpm_dict[powerunit_hostname] = dispatcher_uri
Simran Basi7498d202012-07-10 15:21:28 -0700295 return dispatcher_uri
296
297
298 def register_dispatcher(self, dispatcher_uri):
299 """
300 Called by a dispatcher server so that the frontend server knows it is
301 available to field out RPM requests.
302
303 Adds an entry to the min heap and entry map for this dispatcher.
304
305 @param dispatcher_uri: Address of dispatcher server we are registering.
306 """
307 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
308 with self._lock:
309 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
310 heapq.heappush(self._dispatcher_minheap, heap_entry)
311 self._entry_dict[dispatcher_uri] = heap_entry
312
313
314 def unregister_dispatcher(self, uri_to_unregister):
315 """
316 Called by a dispatcher server as it exits so that the frontend server
317 knows that it is no longer available to field out requests.
318
319 Assigns an rpm count of -1 to this dispatcher so that it will be pushed
320 out of the min heap.
321
322 Removes from _rpm_dict all entries with the value of this dispatcher so
323 that those RPM's can be reassigned to a new dispatcher.
324
325 @param uri_to_unregister: Address of dispatcher server we are
326 unregistering.
327 """
328 logging.info('Unregistering uri: %s as a rpm dispatcher.',
329 uri_to_unregister)
330 with self._lock:
331 heap_entry = self._entry_dict.get(uri_to_unregister)
332 if not heap_entry:
333 logging.warning('%s was not registered.', uri_to_unregister)
334 return
335 # Set this entry's RPM_COUNT to TERMINATED (-1).
336 heap_entry[RPM_COUNT] = TERMINATED
337 # Remove all RPM mappings.
338 for rpm, dispatcher in self._rpm_dict.items():
339 if dispatcher == uri_to_unregister:
340 self._rpm_dict[rpm] = None
341 self._entry_dict[uri_to_unregister] = None
342 # Re-sort the heap and remove any terminated dispatchers.
343 heapq.heapify(self._dispatcher_minheap)
344 self._remove_terminated_dispatchers()
345
346
347 def _remove_terminated_dispatchers(self):
348 """
349 Peek at the head of the heap and keep popping off values until there is
350 a non-terminated dispatcher at the top.
351 """
352 # Heapq guarantees the head of the heap is in the '0' index.
353 try:
354 # Peek at the next element in the heap.
355 top_of_heap = self._dispatcher_minheap[0]
356 while top_of_heap[RPM_COUNT] is TERMINATED:
357 # Pop off the top element.
358 heapq.heappop(self._dispatcher_minheap)
359 # Peek at the next element in the heap.
360 top_of_heap = self._dispatcher_minheap[0]
361 except IndexError:
362 # No more values in the heap. Can be thrown by both minheap[0]
363 # statements.
364 pass
365
366
Simran Basi4e3d1182013-06-25 16:12:30 -0700367 def suspend_emails(self, hours):
368 """Suspend email notifications.
369
370 @param hours: How many hours to suspend email notifications.
371 """
372 if self._email_handler:
373 self._email_handler.suspend_emails(hours)
374
375
376 def resume_emails(self):
377 """Resume email notifications."""
378 if self._email_handler:
379 self._email_handler.resume_emails()
380
381
Simran Basi7498d202012-07-10 15:21:28 -0700382if __name__ == '__main__':
383 """
384 Main function used to launch the frontend server. Creates an instance of
385 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
386 """
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700387 if len(sys.argv) != 2:
388 print 'Usage: ./%s <log_file_dir>.' % sys.argv[0]
Scott Zawalski201d6be2012-09-21 15:56:25 -0400389 sys.exit(1)
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700390
391 email_handler = rpm_logging_config.set_up_logging_to_file(
392 sys.argv[1], LOG_FILENAME_FORMAT)
Simran Basi4e3d1182013-06-25 16:12:30 -0700393 frontend_server = RPMFrontendServer(email_handler=email_handler)
Prathmesh Prabhue315a962017-09-20 14:49:49 -0700394 # We assume that external clients will always connect to us via the
395 # hostname, so listening on the hostname ensures we pick the right network
396 # interface.
397 address = socket.gethostname()
Simran Basi7498d202012-07-10 15:21:28 -0700398 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
399 server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
400 server.register_instance(frontend_server)
401 logging.info('Listening on %s port %d', address, port)
402 server.serve_forever()