blob: acc0c72899345bbd68abaf7e908b2933db2ba203 [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Scott Zawalski201d6be2012-09-21 15:56:25 -040010import sys
Simran Basi7498d202012-07-10 15:21:28 -070011import socket
12import threading
13import xmlrpclib
14
Fang Deng71c4b1f2013-05-20 09:55:04 -070015import rpm_logging_config
Simran Basi7498d202012-07-10 15:21:28 -070016from config import rpm_config
17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
18from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070019
Dan Shib7610b52014-05-06 11:09:49 -070020import common
Fang Dengf63c9782014-08-13 17:08:19 -070021from autotest_lib.server import frontend
Dan Shib7610b52014-05-06 11:09:49 -070022from autotest_lib.site_utils.rpm_control_system import utils
23
Simran Basi7498d202012-07-10 15:21:28 -070024DEFAULT_RPM_COUNT = 0
25TERMINATED = -1
26
27# Indexes for accessing heap entries.
28RPM_COUNT = 0
29DISPATCHER_URI = 1
30
31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
33
34# Valid state values.
35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
36
Fang Deng71c4b1f2013-05-20 09:55:04 -070037# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070038MAPPING_FILE = os.path.join(
39 os.path.dirname(__file__),
40 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070041
Fang Dengf63c9782014-08-13 17:08:19 -070042# Size of the LRU that holds power management unit information related
43# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
44LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
45
Simran Basi7498d202012-07-10 15:21:28 -070046
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -070047class DispatcherDownException(Exception):
48 """Raised when a particular RPMDispatcher is down."""
49
50
Simran Basi7498d202012-07-10 15:21:28 -070051class RPMFrontendServer(object):
52 """
53 This class is the frontend server of the RPM Infrastructure. All clients
Fang Dengf63c9782014-08-13 17:08:19 -070054 will send their power state requests to this central server who will
Simran Basi7498d202012-07-10 15:21:28 -070055 forward the requests to an avaliable or already assigned RPM dispatcher
56 server.
57
58 Once the dispatcher processes the request it will return the result
59 to this frontend server who will send the result back to the client.
60
61 All calls to this server are blocking.
62
63 @var _dispatcher_minheap: Min heap that returns a list of format-
64 [ num_rpm's, dispatcher_uri ]
65 Used to choose the least loaded dispatcher.
Fang Deng71c4b1f2013-05-20 09:55:04 -070066 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
Simran Basi7498d202012-07-10 15:21:28 -070067 heap. If a dispatcher server shuts down this allows us to
68 invalidate the entry in the minheap.
69 @var _lock: Used to protect data from multiple running threads all
70 manipulating the same data.
71 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
72 server.
Fang Deng71c4b1f2013-05-20 09:55:04 -070073 @var _mapping_last_modified: Last-modified time of the servo-interface
74 mapping file.
75 @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
Fang Dengf63c9782014-08-13 17:08:19 -070076 @var _rpm_info: An LRU cache to hold recently visited rpm information
77 so that we don't hit AFE too often. The elements in
78 the cache are instances of PowerUnitInfo indexed by
79 dut hostnames. POE info is not stored in the cache.
80 @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
Simran Basi4e3d1182013-06-25 16:12:30 -070081 @var _email_handler: Email handler to use to control email notifications.
Simran Basi7498d202012-07-10 15:21:28 -070082 """
83
84
Simran Basi4e3d1182013-06-25 16:12:30 -070085 def __init__(self, email_handler=None):
Simran Basi7498d202012-07-10 15:21:28 -070086 """
87 RPMFrontendServer constructor.
88
89 Initializes instance variables.
90 """
91 self._dispatcher_minheap = []
92 self._entry_dict = {}
93 self._lock = threading.Lock()
Fang Deng71c4b1f2013-05-20 09:55:04 -070094 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
95 self._servo_interface = utils.load_servo_interface_mapping()
Fang Dengf63c9782014-08-13 17:08:19 -070096 self._rpm_dict = {}
97 self._afe = frontend.AFE()
98 self._rpm_info = utils.LRUCache(size=LRU_SIZE)
Simran Basi4e3d1182013-06-25 16:12:30 -070099 self._email_handler = email_handler
Simran Basi7498d202012-07-10 15:21:28 -0700100
101
Prathmesh Prabhu0a931ca2018-07-12 17:58:33 -0700102 def set_power_via_poe(self, device_hostname, new_state):
103 """Sets power state of the device to the requested state via POE.
104
105 @param device_hostname: Hostname of the servo to control.
106 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
107 device's outlet to.
108
109 @return: True if the attempt to change power state was successful,
110 False otherwise.
111
112 @raise RPMInfrastructureException: No dispatchers are available or can
113 be reached.
114 """
115 # Remove any DNS Zone information and simplify down to just the hostname.
116 device_hostname = device_hostname.split('.')[0]
117 new_state = new_state.upper()
118 if new_state not in VALID_STATE_VALUES:
119 logging.error('Received request to set servo %s to invalid '
120 'state %s', device_hostname, new_state)
121 return False
122 logging.info('Received request to set servo: %s to state: %s',
123 device_hostname, new_state)
124 powerunit_info = self._get_poe_powerunit_info(device_hostname)
125 try:
126 return self._queue_once(powerunit_info, new_state)
127 except DispatcherDownException:
128 # Retry forwarding the request.
129 return self.set_power_via_poe(device_hostname, new_state)
130
131
132 def set_power_via_rpm(self, device_hostname, rpm_hostname,
133 rpm_outlet, hydra_hostname, new_state):
134 """Sets power state of a device to the requested state via RPM.
135
136 Unlike the special case of POE, powerunit information is not available
137 on the RPM server, so must be provided as arguments.
138
139 @param device_hostname: Hostname of the servo to control.
140 @param rpm_hostname: Hostname of the RPM to use.
141 @param rpm_outlet: The RPM outlet to control.
142 @param hydra_hostname: If required, the hydra device to SSH through to
143 get to the RPM.
144 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
145 device's outlet to.
146
147 @return: True if the attempt to change power state was successful,
148 False otherwise.
149
150 @raise RPMInfrastructureException: No dispatchers are available or can
151 be reached.
152 """
153 powerunit_info = utils.PowerUnitInfo(
154 device_hostname=device_hostname,
155 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM,
156 powerunit_hostname=rpm_hostname,
157 outlet=rpm_outlet,
158 hydra_hostname=hydra_hostname,
159 )
160 try:
161 return self._queue_once(powerunit_info, new_state)
162 except DispatcherDownException:
163 # Retry forwarding the request.
164 return self.set_power_via_rpm(device_hostname, rpm_hostname,
165 rpm_outlet, hydra_hostname, new_state)
166
167
Fang Dengf63c9782014-08-13 17:08:19 -0700168 def queue_request(self, device_hostname, new_state):
Simran Basi7498d202012-07-10 15:21:28 -0700169 """
Fang Dengf63c9782014-08-13 17:08:19 -0700170 Forwards a request to change a device's (a dut or a servo) power state
171 to the appropriate dispatcher server.
Simran Basi7498d202012-07-10 15:21:28 -0700172
173 This call will block until the forwarded request returns.
174
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700175 @param device_hostname: Hostname of the device whose power state we want
176 to change.
Simran Basi7498d202012-07-10 15:21:28 -0700177 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
Fang Dengf63c9782014-08-13 17:08:19 -0700178 device's outlet to.
Simran Basi7498d202012-07-10 15:21:28 -0700179
180 @return: True if the attempt to change power state was successful,
181 False otherwise.
182
183 @raise RPMInfrastructureException: No dispatchers are available or can
184 be reached.
185 """
Fang Dengf63c9782014-08-13 17:08:19 -0700186 # Remove any DNS Zone information and simplify down to just the hostname.
187 device_hostname = device_hostname.split('.')[0]
Simran Basi7498d202012-07-10 15:21:28 -0700188 new_state = new_state.upper()
Simran Basid1222642012-09-24 15:31:58 -0700189 # Put new_state in all uppercase letters
Simran Basi7498d202012-07-10 15:21:28 -0700190 if new_state not in VALID_STATE_VALUES:
Fang Dengf63c9782014-08-13 17:08:19 -0700191 logging.error('Received request to set device %s to invalid '
192 'state %s', device_hostname, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700193 return False
Fang Dengf63c9782014-08-13 17:08:19 -0700194 logging.info('Received request to set device: %s to state: %s',
195 device_hostname, new_state)
196 powerunit_info = self._get_powerunit_info(device_hostname)
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700197 try:
198 return self._queue_once(powerunit_info, new_state)
199 except DispatcherDownException:
200 # Retry forwarding the request.
201 return self.queue_request(device_hostname, new_state)
202
203
204 def _queue_once(self, powerunit_info, new_state):
205 """Queue one request to the dispatcher."""
Fang Dengf63c9782014-08-13 17:08:19 -0700206 dispatcher_uri = self._get_dispatcher(powerunit_info)
Simran Basi7498d202012-07-10 15:21:28 -0700207 if not dispatcher_uri:
208 # No dispatchers available.
209 raise RPMInfrastructureException('No dispatchers available.')
Fang Dengf63c9782014-08-13 17:08:19 -0700210 client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700211 try:
212 # Block on the request and return the result once it arrives.
Fang Dengf63c9782014-08-13 17:08:19 -0700213 return client.queue_request(powerunit_info, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700214 except socket.error as er:
215 # Dispatcher Server is not reachable. Unregister it and retry.
216 logging.error("Can't reach Dispatch Server: %s. Error: %s",
217 dispatcher_uri, errno.errorcode[er.errno])
218 if self.is_network_infrastructure_down():
219 # No dispatchers can handle this request so raise an Exception
220 # to the caller.
221 raise RPMInfrastructureException('No dispatchers can be'
222 'reached.')
223 logging.info('Will attempt forwarding request to other dispatch '
224 'servers.')
225 logging.error('Unregistering %s due to error. Recommend resetting '
226 'that dispatch server.', dispatcher_uri)
227 self.unregister_dispatcher(dispatcher_uri)
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700228 raise DispatcherDownException(dispatcher_uri)
Simran Basi7498d202012-07-10 15:21:28 -0700229
230
231 def is_network_infrastructure_down(self):
232 """
233 Check to see if we can communicate with any dispatcher servers.
234
235 Only called in the situation that queuing a request to a dispatcher
236 server failed.
237
238 @return: False if any dispatcher server is up and the rpm infrastructure
239 can still function. True otherwise.
240 """
241 for dispatcher_entry in self._dispatcher_minheap:
Fang Dengf63c9782014-08-13 17:08:19 -0700242 dispatcher = xmlrpclib.ServerProxy(
243 dispatcher_entry[DISPATCHER_URI], allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700244 try:
245 if dispatcher.is_up():
246 # Atleast one dispatcher is alive so our network is fine.
247 return False
248 except socket.error:
249 # Can't talk to this dispatcher so keep looping.
250 pass
251 logging.error("Can't reach any dispatchers. Check frontend network "
252 'status or all dispatchers are down.')
253 return True
254
255
Fang Dengf63c9782014-08-13 17:08:19 -0700256 def _get_powerunit_info(self, device_hostname):
257 """Get the power management unit information for a device.
258
259 A device could be a chromeos dut or a servo.
260 1) ChromeOS dut
261 Chromeos dut is managed by RPM. The related information
262 we need to know include rpm hostname, rpm outlet, hydra hostname.
263 Such information can be retrieved from afe_host_attributes table
264 from afe. A local LRU cache is used avoid hitting afe too often.
265
266 2) Servo
267 Servo is managed by POE. The related information we need to know
268 include poe hostname, poe interface. Such information is
269 stored in a local file and read into memory.
270
271 @param device_hostname: A string representing the device's hostname.
272
273 @returns: A PowerUnitInfo object.
274 @raises RPMInfrastructureException if failed to get the power
275 unit info.
276
277 """
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700278 if device_hostname.endswith('servo'):
279 return self._get_poe_powerunit_info(device_hostname)
280 else:
281 return self._get_rpm_powerunit_info(device_hostname)
282
283
284 def _get_poe_powerunit_info(self, device_hostname):
285 """Get the power management unit information for a POE controller.
286
287 Servo is managed by POE. The related information we need to know
288 include poe hostname, poe interface. Such information is
289 stored in a local file and read into memory.
290
291 @param device_hostname: A string representing the device's hostname.
292
293 @returns: A PowerUnitInfo object.
294 @raises RPMInfrastructureException if failed to get the power
295 unit info.
296
297 """
Fang Dengf63c9782014-08-13 17:08:19 -0700298 with self._lock:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700299 reload_info = utils.reload_servo_interface_mapping_if_necessary(
300 self._mapping_last_modified)
301 if reload_info:
302 self._mapping_last_modified, self._servo_interface = reload_info
303 switch_if_tuple = self._servo_interface.get(device_hostname)
304 if not switch_if_tuple:
305 raise RPMInfrastructureException(
306 'Could not determine POE hostname for %s. '
307 'Please check the servo-interface mapping file.',
308 device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700309 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700310 return utils.PowerUnitInfo(
311 device_hostname=device_hostname,
312 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
313 powerunit_hostname=switch_if_tuple[0],
314 outlet=switch_if_tuple[1],
315 hydra_hostname=None)
316
317
318
319 def _get_rpm_powerunit_info(self, device_hostname):
320 """Get the power management unit information for an RPM controller.
321
322 Chromeos dut is managed by RPM. The related information
323 we need to know include rpm hostname, rpm outlet, hydra hostname.
324 Such information can be retrieved from afe_host_attributes table
325 from afe. A local LRU cache is used avoid hitting afe too often.
326
327 @param device_hostname: A string representing the device's hostname.
328
329 @returns: A PowerUnitInfo object.
330 @raises RPMInfrastructureException if failed to get the power
331 unit info.
332
333 """
334 with self._lock:
335 # Regular DUTs are managed by RPMs.
336 if device_hostname in self._rpm_info:
337 return self._rpm_info[device_hostname]
338 else:
339 hosts = self._afe.get_hosts(hostname=device_hostname)
340 if not hosts:
341 raise RPMInfrastructureException(
342 'Can not retrieve rpm information '
343 'from AFE for %s, no host found.' % device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700344 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700345 info = utils.PowerUnitInfo.get_powerunit_info(hosts[0])
346 self._rpm_info[device_hostname] = info
347 return info
348
Fang Dengf63c9782014-08-13 17:08:19 -0700349
350
351 def _get_dispatcher(self, powerunit_info):
Simran Basi7498d202012-07-10 15:21:28 -0700352 """
353 Private method that looks up or assigns a dispatcher server
Fang Dengf63c9782014-08-13 17:08:19 -0700354 responsible for communicating with the given RPM/POE.
Simran Basi7498d202012-07-10 15:21:28 -0700355
356 Will also call _check_dispatcher to make sure it is up before returning
357 it.
358
Fang Dengf63c9782014-08-13 17:08:19 -0700359 @param powerunit_info: A PowerUnitInfo instance.
Simran Basi7498d202012-07-10 15:21:28 -0700360
Fang Dengf63c9782014-08-13 17:08:19 -0700361 @return: URI of dispatcher server responsible for the rpm/poe.
362 None if no dispatcher servers are available.
Simran Basi7498d202012-07-10 15:21:28 -0700363 """
Fang Dengf63c9782014-08-13 17:08:19 -0700364 powerunit_type = powerunit_info.powerunit_type
365 powerunit_hostname = powerunit_info.powerunit_hostname
Simran Basi7498d202012-07-10 15:21:28 -0700366 with self._lock:
Fang Dengf63c9782014-08-13 17:08:19 -0700367 if self._rpm_dict.get(powerunit_hostname):
368 return self._rpm_dict[powerunit_hostname]
369 logging.info('No Dispatcher assigned for %s %s.',
370 powerunit_type, powerunit_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700371 # Choose the least loaded dispatcher to communicate with the RPM.
372 try:
373 heap_entry = heapq.heappop(self._dispatcher_minheap)
374 except IndexError:
375 logging.error('Infrastructure Error: Frontend has no'
376 'registered dispatchers to field out this '
377 'request!')
378 return None
379 dispatcher_uri = heap_entry[DISPATCHER_URI]
380 # Put this entry back in the heap with an RPM Count + 1.
381 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
382 heapq.heappush(self._dispatcher_minheap, heap_entry)
Fang Dengf63c9782014-08-13 17:08:19 -0700383 logging.info('Assigning %s for %s %s', dispatcher_uri,
384 powerunit_type, powerunit_hostname)
385 self._rpm_dict[powerunit_hostname] = dispatcher_uri
Simran Basi7498d202012-07-10 15:21:28 -0700386 return dispatcher_uri
387
388
389 def register_dispatcher(self, dispatcher_uri):
390 """
391 Called by a dispatcher server so that the frontend server knows it is
392 available to field out RPM requests.
393
394 Adds an entry to the min heap and entry map for this dispatcher.
395
396 @param dispatcher_uri: Address of dispatcher server we are registering.
397 """
398 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
399 with self._lock:
400 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
401 heapq.heappush(self._dispatcher_minheap, heap_entry)
402 self._entry_dict[dispatcher_uri] = heap_entry
403
404
405 def unregister_dispatcher(self, uri_to_unregister):
406 """
407 Called by a dispatcher server as it exits so that the frontend server
408 knows that it is no longer available to field out requests.
409
410 Assigns an rpm count of -1 to this dispatcher so that it will be pushed
411 out of the min heap.
412
413 Removes from _rpm_dict all entries with the value of this dispatcher so
414 that those RPM's can be reassigned to a new dispatcher.
415
416 @param uri_to_unregister: Address of dispatcher server we are
417 unregistering.
418 """
419 logging.info('Unregistering uri: %s as a rpm dispatcher.',
420 uri_to_unregister)
421 with self._lock:
422 heap_entry = self._entry_dict.get(uri_to_unregister)
423 if not heap_entry:
424 logging.warning('%s was not registered.', uri_to_unregister)
425 return
426 # Set this entry's RPM_COUNT to TERMINATED (-1).
427 heap_entry[RPM_COUNT] = TERMINATED
428 # Remove all RPM mappings.
429 for rpm, dispatcher in self._rpm_dict.items():
430 if dispatcher == uri_to_unregister:
431 self._rpm_dict[rpm] = None
432 self._entry_dict[uri_to_unregister] = None
433 # Re-sort the heap and remove any terminated dispatchers.
434 heapq.heapify(self._dispatcher_minheap)
435 self._remove_terminated_dispatchers()
436
437
438 def _remove_terminated_dispatchers(self):
439 """
440 Peek at the head of the heap and keep popping off values until there is
441 a non-terminated dispatcher at the top.
442 """
443 # Heapq guarantees the head of the heap is in the '0' index.
444 try:
445 # Peek at the next element in the heap.
446 top_of_heap = self._dispatcher_minheap[0]
447 while top_of_heap[RPM_COUNT] is TERMINATED:
448 # Pop off the top element.
449 heapq.heappop(self._dispatcher_minheap)
450 # Peek at the next element in the heap.
451 top_of_heap = self._dispatcher_minheap[0]
452 except IndexError:
453 # No more values in the heap. Can be thrown by both minheap[0]
454 # statements.
455 pass
456
457
Simran Basi4e3d1182013-06-25 16:12:30 -0700458 def suspend_emails(self, hours):
459 """Suspend email notifications.
460
461 @param hours: How many hours to suspend email notifications.
462 """
463 if self._email_handler:
464 self._email_handler.suspend_emails(hours)
465
466
467 def resume_emails(self):
468 """Resume email notifications."""
469 if self._email_handler:
470 self._email_handler.resume_emails()
471
472
Simran Basi7498d202012-07-10 15:21:28 -0700473if __name__ == '__main__':
474 """
475 Main function used to launch the frontend server. Creates an instance of
476 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
477 """
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700478 if len(sys.argv) != 2:
479 print 'Usage: ./%s <log_file_dir>.' % sys.argv[0]
Scott Zawalski201d6be2012-09-21 15:56:25 -0400480 sys.exit(1)
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700481
482 email_handler = rpm_logging_config.set_up_logging_to_file(
483 sys.argv[1], LOG_FILENAME_FORMAT)
Simran Basi4e3d1182013-06-25 16:12:30 -0700484 frontend_server = RPMFrontendServer(email_handler=email_handler)
Prathmesh Prabhue315a962017-09-20 14:49:49 -0700485 # We assume that external clients will always connect to us via the
486 # hostname, so listening on the hostname ensures we pick the right network
487 # interface.
488 address = socket.gethostname()
Simran Basi7498d202012-07-10 15:21:28 -0700489 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
490 server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
491 server.register_instance(frontend_server)
492 logging.info('Listening on %s port %d', address, port)
493 server.serve_forever()