blob: 7e5b82fdf29998f12e5d625303e758c63696af34 [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Scott Zawalski201d6be2012-09-21 15:56:25 -040010import sys
Simran Basi7498d202012-07-10 15:21:28 -070011import socket
12import threading
13import xmlrpclib
14
Fang Deng71c4b1f2013-05-20 09:55:04 -070015import rpm_logging_config
Simran Basi7498d202012-07-10 15:21:28 -070016from config import rpm_config
17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
18from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070019
Dan Shib7610b52014-05-06 11:09:49 -070020import common
Fang Dengf63c9782014-08-13 17:08:19 -070021from autotest_lib.server import frontend
Dan Shib7610b52014-05-06 11:09:49 -070022from autotest_lib.site_utils.rpm_control_system import utils
23
Simran Basi7498d202012-07-10 15:21:28 -070024DEFAULT_RPM_COUNT = 0
25TERMINATED = -1
26
27# Indexes for accessing heap entries.
28RPM_COUNT = 0
29DISPATCHER_URI = 1
30
31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
33
34# Valid state values.
35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
36
Fang Deng71c4b1f2013-05-20 09:55:04 -070037# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070038MAPPING_FILE = os.path.join(
39 os.path.dirname(__file__),
40 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070041
Fang Dengf63c9782014-08-13 17:08:19 -070042# Size of the LRU that holds power management unit information related
43# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
44LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
45
Simran Basi7498d202012-07-10 15:21:28 -070046
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -070047class DispatcherDownException(Exception):
48 """Raised when a particular RPMDispatcher is down."""
49
50
Simran Basi7498d202012-07-10 15:21:28 -070051class RPMFrontendServer(object):
52 """
53 This class is the frontend server of the RPM Infrastructure. All clients
Fang Dengf63c9782014-08-13 17:08:19 -070054 will send their power state requests to this central server who will
Simran Basi7498d202012-07-10 15:21:28 -070055 forward the requests to an avaliable or already assigned RPM dispatcher
56 server.
57
58 Once the dispatcher processes the request it will return the result
59 to this frontend server who will send the result back to the client.
60
61 All calls to this server are blocking.
62
63 @var _dispatcher_minheap: Min heap that returns a list of format-
64 [ num_rpm's, dispatcher_uri ]
65 Used to choose the least loaded dispatcher.
Fang Deng71c4b1f2013-05-20 09:55:04 -070066 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
Simran Basi7498d202012-07-10 15:21:28 -070067 heap. If a dispatcher server shuts down this allows us to
68 invalidate the entry in the minheap.
69 @var _lock: Used to protect data from multiple running threads all
70 manipulating the same data.
71 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
72 server.
Fang Deng71c4b1f2013-05-20 09:55:04 -070073 @var _mapping_last_modified: Last-modified time of the servo-interface
74 mapping file.
75 @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
Fang Dengf63c9782014-08-13 17:08:19 -070076 @var _rpm_info: An LRU cache to hold recently visited rpm information
77 so that we don't hit AFE too often. The elements in
78 the cache are instances of PowerUnitInfo indexed by
79 dut hostnames. POE info is not stored in the cache.
80 @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
Simran Basi4e3d1182013-06-25 16:12:30 -070081 @var _email_handler: Email handler to use to control email notifications.
Simran Basi7498d202012-07-10 15:21:28 -070082 """
83
84
Simran Basi4e3d1182013-06-25 16:12:30 -070085 def __init__(self, email_handler=None):
Simran Basi7498d202012-07-10 15:21:28 -070086 """
87 RPMFrontendServer constructor.
88
89 Initializes instance variables.
90 """
91 self._dispatcher_minheap = []
92 self._entry_dict = {}
93 self._lock = threading.Lock()
Fang Deng71c4b1f2013-05-20 09:55:04 -070094 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
95 self._servo_interface = utils.load_servo_interface_mapping()
Fang Dengf63c9782014-08-13 17:08:19 -070096 self._rpm_dict = {}
97 self._afe = frontend.AFE()
98 self._rpm_info = utils.LRUCache(size=LRU_SIZE)
Simran Basi4e3d1182013-06-25 16:12:30 -070099 self._email_handler = email_handler
Simran Basi7498d202012-07-10 15:21:28 -0700100
101
Prathmesh Prabhu0a931ca2018-07-12 17:58:33 -0700102 def set_power_via_poe(self, device_hostname, new_state):
103 """Sets power state of the device to the requested state via POE.
104
105 @param device_hostname: Hostname of the servo to control.
106 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
107 device's outlet to.
108
109 @return: True if the attempt to change power state was successful,
110 False otherwise.
111
112 @raise RPMInfrastructureException: No dispatchers are available or can
113 be reached.
114 """
115 # Remove any DNS Zone information and simplify down to just the hostname.
116 device_hostname = device_hostname.split('.')[0]
117 new_state = new_state.upper()
118 if new_state not in VALID_STATE_VALUES:
119 logging.error('Received request to set servo %s to invalid '
120 'state %s', device_hostname, new_state)
121 return False
122 logging.info('Received request to set servo: %s to state: %s',
123 device_hostname, new_state)
124 powerunit_info = self._get_poe_powerunit_info(device_hostname)
125 try:
126 return self._queue_once(powerunit_info, new_state)
127 except DispatcherDownException:
128 # Retry forwarding the request.
129 return self.set_power_via_poe(device_hostname, new_state)
130
131
132 def set_power_via_rpm(self, device_hostname, rpm_hostname,
133 rpm_outlet, hydra_hostname, new_state):
134 """Sets power state of a device to the requested state via RPM.
135
136 Unlike the special case of POE, powerunit information is not available
137 on the RPM server, so must be provided as arguments.
138
139 @param device_hostname: Hostname of the servo to control.
140 @param rpm_hostname: Hostname of the RPM to use.
141 @param rpm_outlet: The RPM outlet to control.
142 @param hydra_hostname: If required, the hydra device to SSH through to
143 get to the RPM.
144 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
145 device's outlet to.
146
147 @return: True if the attempt to change power state was successful,
148 False otherwise.
149
150 @raise RPMInfrastructureException: No dispatchers are available or can
151 be reached.
152 """
Garry Wang84281e52019-05-14 15:00:13 -0700153 new_state = new_state.upper()
154 if new_state not in VALID_STATE_VALUES:
155 logging.error('Received request to set device %s to invalid '
156 'state %s', device_hostname, new_state)
157 return False
158 logging.info('Received request to set device: %s to state: %s',
159 device_hostname, new_state)
160
Prathmesh Prabhu0a931ca2018-07-12 17:58:33 -0700161 powerunit_info = utils.PowerUnitInfo(
162 device_hostname=device_hostname,
163 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM,
164 powerunit_hostname=rpm_hostname,
165 outlet=rpm_outlet,
166 hydra_hostname=hydra_hostname,
167 )
168 try:
169 return self._queue_once(powerunit_info, new_state)
170 except DispatcherDownException:
171 # Retry forwarding the request.
172 return self.set_power_via_rpm(device_hostname, rpm_hostname,
173 rpm_outlet, hydra_hostname, new_state)
174
175
Fang Dengf63c9782014-08-13 17:08:19 -0700176 def queue_request(self, device_hostname, new_state):
Simran Basi7498d202012-07-10 15:21:28 -0700177 """
Fang Dengf63c9782014-08-13 17:08:19 -0700178 Forwards a request to change a device's (a dut or a servo) power state
179 to the appropriate dispatcher server.
Simran Basi7498d202012-07-10 15:21:28 -0700180
181 This call will block until the forwarded request returns.
182
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700183 @param device_hostname: Hostname of the device whose power state we want
184 to change.
Simran Basi7498d202012-07-10 15:21:28 -0700185 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
Fang Dengf63c9782014-08-13 17:08:19 -0700186 device's outlet to.
Simran Basi7498d202012-07-10 15:21:28 -0700187
188 @return: True if the attempt to change power state was successful,
189 False otherwise.
190
191 @raise RPMInfrastructureException: No dispatchers are available or can
192 be reached.
193 """
Fang Dengf63c9782014-08-13 17:08:19 -0700194 # Remove any DNS Zone information and simplify down to just the hostname.
195 device_hostname = device_hostname.split('.')[0]
Simran Basi7498d202012-07-10 15:21:28 -0700196 new_state = new_state.upper()
Simran Basid1222642012-09-24 15:31:58 -0700197 # Put new_state in all uppercase letters
Simran Basi7498d202012-07-10 15:21:28 -0700198 if new_state not in VALID_STATE_VALUES:
Fang Dengf63c9782014-08-13 17:08:19 -0700199 logging.error('Received request to set device %s to invalid '
200 'state %s', device_hostname, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700201 return False
Fang Dengf63c9782014-08-13 17:08:19 -0700202 logging.info('Received request to set device: %s to state: %s',
203 device_hostname, new_state)
204 powerunit_info = self._get_powerunit_info(device_hostname)
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700205 try:
206 return self._queue_once(powerunit_info, new_state)
207 except DispatcherDownException:
208 # Retry forwarding the request.
209 return self.queue_request(device_hostname, new_state)
210
211
212 def _queue_once(self, powerunit_info, new_state):
213 """Queue one request to the dispatcher."""
Fang Dengf63c9782014-08-13 17:08:19 -0700214 dispatcher_uri = self._get_dispatcher(powerunit_info)
Simran Basi7498d202012-07-10 15:21:28 -0700215 if not dispatcher_uri:
216 # No dispatchers available.
217 raise RPMInfrastructureException('No dispatchers available.')
Fang Dengf63c9782014-08-13 17:08:19 -0700218 client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700219 try:
220 # Block on the request and return the result once it arrives.
Fang Dengf63c9782014-08-13 17:08:19 -0700221 return client.queue_request(powerunit_info, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700222 except socket.error as er:
223 # Dispatcher Server is not reachable. Unregister it and retry.
224 logging.error("Can't reach Dispatch Server: %s. Error: %s",
225 dispatcher_uri, errno.errorcode[er.errno])
226 if self.is_network_infrastructure_down():
227 # No dispatchers can handle this request so raise an Exception
228 # to the caller.
229 raise RPMInfrastructureException('No dispatchers can be'
230 'reached.')
231 logging.info('Will attempt forwarding request to other dispatch '
232 'servers.')
233 logging.error('Unregistering %s due to error. Recommend resetting '
234 'that dispatch server.', dispatcher_uri)
235 self.unregister_dispatcher(dispatcher_uri)
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700236 raise DispatcherDownException(dispatcher_uri)
Simran Basi7498d202012-07-10 15:21:28 -0700237
238
239 def is_network_infrastructure_down(self):
240 """
241 Check to see if we can communicate with any dispatcher servers.
242
243 Only called in the situation that queuing a request to a dispatcher
244 server failed.
245
246 @return: False if any dispatcher server is up and the rpm infrastructure
247 can still function. True otherwise.
248 """
249 for dispatcher_entry in self._dispatcher_minheap:
Fang Dengf63c9782014-08-13 17:08:19 -0700250 dispatcher = xmlrpclib.ServerProxy(
251 dispatcher_entry[DISPATCHER_URI], allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700252 try:
253 if dispatcher.is_up():
254 # Atleast one dispatcher is alive so our network is fine.
255 return False
256 except socket.error:
257 # Can't talk to this dispatcher so keep looping.
258 pass
259 logging.error("Can't reach any dispatchers. Check frontend network "
260 'status or all dispatchers are down.')
261 return True
262
263
Fang Dengf63c9782014-08-13 17:08:19 -0700264 def _get_powerunit_info(self, device_hostname):
265 """Get the power management unit information for a device.
266
267 A device could be a chromeos dut or a servo.
268 1) ChromeOS dut
269 Chromeos dut is managed by RPM. The related information
270 we need to know include rpm hostname, rpm outlet, hydra hostname.
271 Such information can be retrieved from afe_host_attributes table
272 from afe. A local LRU cache is used avoid hitting afe too often.
273
274 2) Servo
275 Servo is managed by POE. The related information we need to know
276 include poe hostname, poe interface. Such information is
277 stored in a local file and read into memory.
278
279 @param device_hostname: A string representing the device's hostname.
280
281 @returns: A PowerUnitInfo object.
282 @raises RPMInfrastructureException if failed to get the power
283 unit info.
284
285 """
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700286 if device_hostname.endswith('servo'):
287 return self._get_poe_powerunit_info(device_hostname)
288 else:
289 return self._get_rpm_powerunit_info(device_hostname)
290
291
292 def _get_poe_powerunit_info(self, device_hostname):
293 """Get the power management unit information for a POE controller.
294
295 Servo is managed by POE. The related information we need to know
296 include poe hostname, poe interface. Such information is
297 stored in a local file and read into memory.
298
299 @param device_hostname: A string representing the device's hostname.
300
301 @returns: A PowerUnitInfo object.
302 @raises RPMInfrastructureException if failed to get the power
303 unit info.
304
305 """
Fang Dengf63c9782014-08-13 17:08:19 -0700306 with self._lock:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700307 reload_info = utils.reload_servo_interface_mapping_if_necessary(
308 self._mapping_last_modified)
309 if reload_info:
310 self._mapping_last_modified, self._servo_interface = reload_info
311 switch_if_tuple = self._servo_interface.get(device_hostname)
312 if not switch_if_tuple:
313 raise RPMInfrastructureException(
314 'Could not determine POE hostname for %s. '
315 'Please check the servo-interface mapping file.',
316 device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700317 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700318 return utils.PowerUnitInfo(
319 device_hostname=device_hostname,
320 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
321 powerunit_hostname=switch_if_tuple[0],
322 outlet=switch_if_tuple[1],
323 hydra_hostname=None)
324
325
326
327 def _get_rpm_powerunit_info(self, device_hostname):
328 """Get the power management unit information for an RPM controller.
329
330 Chromeos dut is managed by RPM. The related information
331 we need to know include rpm hostname, rpm outlet, hydra hostname.
332 Such information can be retrieved from afe_host_attributes table
333 from afe. A local LRU cache is used avoid hitting afe too often.
334
335 @param device_hostname: A string representing the device's hostname.
336
337 @returns: A PowerUnitInfo object.
338 @raises RPMInfrastructureException if failed to get the power
339 unit info.
340
341 """
342 with self._lock:
343 # Regular DUTs are managed by RPMs.
344 if device_hostname in self._rpm_info:
345 return self._rpm_info[device_hostname]
346 else:
347 hosts = self._afe.get_hosts(hostname=device_hostname)
348 if not hosts:
349 raise RPMInfrastructureException(
350 'Can not retrieve rpm information '
351 'from AFE for %s, no host found.' % device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700352 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700353 info = utils.PowerUnitInfo.get_powerunit_info(hosts[0])
354 self._rpm_info[device_hostname] = info
355 return info
356
Fang Dengf63c9782014-08-13 17:08:19 -0700357
358
359 def _get_dispatcher(self, powerunit_info):
Simran Basi7498d202012-07-10 15:21:28 -0700360 """
361 Private method that looks up or assigns a dispatcher server
Fang Dengf63c9782014-08-13 17:08:19 -0700362 responsible for communicating with the given RPM/POE.
Simran Basi7498d202012-07-10 15:21:28 -0700363
364 Will also call _check_dispatcher to make sure it is up before returning
365 it.
366
Fang Dengf63c9782014-08-13 17:08:19 -0700367 @param powerunit_info: A PowerUnitInfo instance.
Simran Basi7498d202012-07-10 15:21:28 -0700368
Fang Dengf63c9782014-08-13 17:08:19 -0700369 @return: URI of dispatcher server responsible for the rpm/poe.
370 None if no dispatcher servers are available.
Simran Basi7498d202012-07-10 15:21:28 -0700371 """
Fang Dengf63c9782014-08-13 17:08:19 -0700372 powerunit_type = powerunit_info.powerunit_type
373 powerunit_hostname = powerunit_info.powerunit_hostname
Simran Basi7498d202012-07-10 15:21:28 -0700374 with self._lock:
Fang Dengf63c9782014-08-13 17:08:19 -0700375 if self._rpm_dict.get(powerunit_hostname):
376 return self._rpm_dict[powerunit_hostname]
377 logging.info('No Dispatcher assigned for %s %s.',
378 powerunit_type, powerunit_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700379 # Choose the least loaded dispatcher to communicate with the RPM.
380 try:
381 heap_entry = heapq.heappop(self._dispatcher_minheap)
382 except IndexError:
383 logging.error('Infrastructure Error: Frontend has no'
384 'registered dispatchers to field out this '
385 'request!')
386 return None
387 dispatcher_uri = heap_entry[DISPATCHER_URI]
388 # Put this entry back in the heap with an RPM Count + 1.
389 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
390 heapq.heappush(self._dispatcher_minheap, heap_entry)
Fang Dengf63c9782014-08-13 17:08:19 -0700391 logging.info('Assigning %s for %s %s', dispatcher_uri,
392 powerunit_type, powerunit_hostname)
393 self._rpm_dict[powerunit_hostname] = dispatcher_uri
Simran Basi7498d202012-07-10 15:21:28 -0700394 return dispatcher_uri
395
396
397 def register_dispatcher(self, dispatcher_uri):
398 """
399 Called by a dispatcher server so that the frontend server knows it is
400 available to field out RPM requests.
401
402 Adds an entry to the min heap and entry map for this dispatcher.
403
404 @param dispatcher_uri: Address of dispatcher server we are registering.
405 """
406 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
407 with self._lock:
408 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
409 heapq.heappush(self._dispatcher_minheap, heap_entry)
410 self._entry_dict[dispatcher_uri] = heap_entry
411
412
413 def unregister_dispatcher(self, uri_to_unregister):
414 """
415 Called by a dispatcher server as it exits so that the frontend server
416 knows that it is no longer available to field out requests.
417
418 Assigns an rpm count of -1 to this dispatcher so that it will be pushed
419 out of the min heap.
420
421 Removes from _rpm_dict all entries with the value of this dispatcher so
422 that those RPM's can be reassigned to a new dispatcher.
423
424 @param uri_to_unregister: Address of dispatcher server we are
425 unregistering.
426 """
427 logging.info('Unregistering uri: %s as a rpm dispatcher.',
428 uri_to_unregister)
429 with self._lock:
430 heap_entry = self._entry_dict.get(uri_to_unregister)
431 if not heap_entry:
432 logging.warning('%s was not registered.', uri_to_unregister)
433 return
434 # Set this entry's RPM_COUNT to TERMINATED (-1).
435 heap_entry[RPM_COUNT] = TERMINATED
436 # Remove all RPM mappings.
437 for rpm, dispatcher in self._rpm_dict.items():
438 if dispatcher == uri_to_unregister:
439 self._rpm_dict[rpm] = None
440 self._entry_dict[uri_to_unregister] = None
441 # Re-sort the heap and remove any terminated dispatchers.
442 heapq.heapify(self._dispatcher_minheap)
443 self._remove_terminated_dispatchers()
444
445
446 def _remove_terminated_dispatchers(self):
447 """
448 Peek at the head of the heap and keep popping off values until there is
449 a non-terminated dispatcher at the top.
450 """
451 # Heapq guarantees the head of the heap is in the '0' index.
452 try:
453 # Peek at the next element in the heap.
454 top_of_heap = self._dispatcher_minheap[0]
455 while top_of_heap[RPM_COUNT] is TERMINATED:
456 # Pop off the top element.
457 heapq.heappop(self._dispatcher_minheap)
458 # Peek at the next element in the heap.
459 top_of_heap = self._dispatcher_minheap[0]
460 except IndexError:
461 # No more values in the heap. Can be thrown by both minheap[0]
462 # statements.
463 pass
464
465
Simran Basi4e3d1182013-06-25 16:12:30 -0700466 def suspend_emails(self, hours):
467 """Suspend email notifications.
468
469 @param hours: How many hours to suspend email notifications.
470 """
471 if self._email_handler:
472 self._email_handler.suspend_emails(hours)
473
474
475 def resume_emails(self):
476 """Resume email notifications."""
477 if self._email_handler:
478 self._email_handler.resume_emails()
479
480
Simran Basi7498d202012-07-10 15:21:28 -0700481if __name__ == '__main__':
482 """
483 Main function used to launch the frontend server. Creates an instance of
484 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
485 """
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700486 if len(sys.argv) != 2:
487 print 'Usage: ./%s <log_file_dir>.' % sys.argv[0]
Scott Zawalski201d6be2012-09-21 15:56:25 -0400488 sys.exit(1)
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700489
490 email_handler = rpm_logging_config.set_up_logging_to_file(
491 sys.argv[1], LOG_FILENAME_FORMAT)
Simran Basi4e3d1182013-06-25 16:12:30 -0700492 frontend_server = RPMFrontendServer(email_handler=email_handler)
Prathmesh Prabhue315a962017-09-20 14:49:49 -0700493 # We assume that external clients will always connect to us via the
494 # hostname, so listening on the hostname ensures we pick the right network
495 # interface.
496 address = socket.gethostname()
Simran Basi7498d202012-07-10 15:21:28 -0700497 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
498 server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
499 server.register_instance(frontend_server)
500 logging.info('Listening on %s port %d', address, port)
501 server.serve_forever()