blob: 499eeb26e059f6e8d131321595600da6f09b0df5 [file] [log] [blame]
Mike Frysingerd03e6b52019-08-03 12:49:01 -04001#!/usr/bin/python2
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Scott Zawalski201d6be2012-09-21 15:56:25 -040010import sys
Simran Basi7498d202012-07-10 15:21:28 -070011import socket
12import threading
13import xmlrpclib
14
Fang Deng71c4b1f2013-05-20 09:55:04 -070015import rpm_logging_config
Simran Basi7498d202012-07-10 15:21:28 -070016from config import rpm_config
17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
18from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070019
Dan Shib7610b52014-05-06 11:09:49 -070020import common
Fang Dengf63c9782014-08-13 17:08:19 -070021from autotest_lib.server import frontend
Dan Shib7610b52014-05-06 11:09:49 -070022from autotest_lib.site_utils.rpm_control_system import utils
23
Simran Basi7498d202012-07-10 15:21:28 -070024DEFAULT_RPM_COUNT = 0
25TERMINATED = -1
26
27# Indexes for accessing heap entries.
28RPM_COUNT = 0
29DISPATCHER_URI = 1
30
31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
33
34# Valid state values.
35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
36
Fang Deng71c4b1f2013-05-20 09:55:04 -070037# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070038MAPPING_FILE = os.path.join(
39 os.path.dirname(__file__),
40 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070041
Fang Dengf63c9782014-08-13 17:08:19 -070042# Size of the LRU that holds power management unit information related
43# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
44LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
45
Simran Basi7498d202012-07-10 15:21:28 -070046
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -070047class DispatcherDownException(Exception):
48 """Raised when a particular RPMDispatcher is down."""
49
50
Simran Basi7498d202012-07-10 15:21:28 -070051class RPMFrontendServer(object):
52 """
53 This class is the frontend server of the RPM Infrastructure. All clients
Fang Dengf63c9782014-08-13 17:08:19 -070054 will send their power state requests to this central server who will
Simran Basi7498d202012-07-10 15:21:28 -070055 forward the requests to an avaliable or already assigned RPM dispatcher
56 server.
57
58 Once the dispatcher processes the request it will return the result
59 to this frontend server who will send the result back to the client.
60
61 All calls to this server are blocking.
62
63 @var _dispatcher_minheap: Min heap that returns a list of format-
64 [ num_rpm's, dispatcher_uri ]
65 Used to choose the least loaded dispatcher.
Fang Deng71c4b1f2013-05-20 09:55:04 -070066 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
Simran Basi7498d202012-07-10 15:21:28 -070067 heap. If a dispatcher server shuts down this allows us to
68 invalidate the entry in the minheap.
69 @var _lock: Used to protect data from multiple running threads all
70 manipulating the same data.
71 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
72 server.
Fang Deng71c4b1f2013-05-20 09:55:04 -070073 @var _mapping_last_modified: Last-modified time of the servo-interface
74 mapping file.
75 @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
Fang Dengf63c9782014-08-13 17:08:19 -070076 @var _rpm_info: An LRU cache to hold recently visited rpm information
77 so that we don't hit AFE too often. The elements in
78 the cache are instances of PowerUnitInfo indexed by
79 dut hostnames. POE info is not stored in the cache.
80 @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
Simran Basi4e3d1182013-06-25 16:12:30 -070081 @var _email_handler: Email handler to use to control email notifications.
Simran Basi7498d202012-07-10 15:21:28 -070082 """
83
84
Simran Basi4e3d1182013-06-25 16:12:30 -070085 def __init__(self, email_handler=None):
Simran Basi7498d202012-07-10 15:21:28 -070086 """
87 RPMFrontendServer constructor.
88
89 Initializes instance variables.
90 """
91 self._dispatcher_minheap = []
92 self._entry_dict = {}
93 self._lock = threading.Lock()
Fang Deng71c4b1f2013-05-20 09:55:04 -070094 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
95 self._servo_interface = utils.load_servo_interface_mapping()
Fang Dengf63c9782014-08-13 17:08:19 -070096 self._rpm_dict = {}
97 self._afe = frontend.AFE()
98 self._rpm_info = utils.LRUCache(size=LRU_SIZE)
Simran Basi4e3d1182013-06-25 16:12:30 -070099 self._email_handler = email_handler
Simran Basi7498d202012-07-10 15:21:28 -0700100
101
Prathmesh Prabhu0a931ca2018-07-12 17:58:33 -0700102 def set_power_via_poe(self, device_hostname, new_state):
103 """Sets power state of the device to the requested state via POE.
104
105 @param device_hostname: Hostname of the servo to control.
106 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
107 device's outlet to.
108
109 @return: True if the attempt to change power state was successful,
110 False otherwise.
111
112 @raise RPMInfrastructureException: No dispatchers are available or can
113 be reached.
114 """
115 # Remove any DNS Zone information and simplify down to just the hostname.
116 device_hostname = device_hostname.split('.')[0]
117 new_state = new_state.upper()
118 if new_state not in VALID_STATE_VALUES:
119 logging.error('Received request to set servo %s to invalid '
120 'state %s', device_hostname, new_state)
121 return False
122 logging.info('Received request to set servo: %s to state: %s',
123 device_hostname, new_state)
124 powerunit_info = self._get_poe_powerunit_info(device_hostname)
125 try:
126 return self._queue_once(powerunit_info, new_state)
127 except DispatcherDownException:
128 # Retry forwarding the request.
129 return self.set_power_via_poe(device_hostname, new_state)
130
131
132 def set_power_via_rpm(self, device_hostname, rpm_hostname,
133 rpm_outlet, hydra_hostname, new_state):
134 """Sets power state of a device to the requested state via RPM.
135
136 Unlike the special case of POE, powerunit information is not available
137 on the RPM server, so must be provided as arguments.
138
139 @param device_hostname: Hostname of the servo to control.
140 @param rpm_hostname: Hostname of the RPM to use.
141 @param rpm_outlet: The RPM outlet to control.
142 @param hydra_hostname: If required, the hydra device to SSH through to
143 get to the RPM.
144 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
145 device's outlet to.
146
147 @return: True if the attempt to change power state was successful,
148 False otherwise.
149
150 @raise RPMInfrastructureException: No dispatchers are available or can
151 be reached.
152 """
Garry Wang84281e52019-05-14 15:00:13 -0700153 new_state = new_state.upper()
154 if new_state not in VALID_STATE_VALUES:
155 logging.error('Received request to set device %s to invalid '
156 'state %s', device_hostname, new_state)
157 return False
158 logging.info('Received request to set device: %s to state: %s',
159 device_hostname, new_state)
160
Prathmesh Prabhu0a931ca2018-07-12 17:58:33 -0700161 powerunit_info = utils.PowerUnitInfo(
162 device_hostname=device_hostname,
163 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.RPM,
164 powerunit_hostname=rpm_hostname,
165 outlet=rpm_outlet,
166 hydra_hostname=hydra_hostname,
167 )
168 try:
169 return self._queue_once(powerunit_info, new_state)
170 except DispatcherDownException:
171 # Retry forwarding the request.
172 return self.set_power_via_rpm(device_hostname, rpm_hostname,
173 rpm_outlet, hydra_hostname, new_state)
174
175
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700176 def _queue_once(self, powerunit_info, new_state):
177 """Queue one request to the dispatcher."""
Fang Dengf63c9782014-08-13 17:08:19 -0700178 dispatcher_uri = self._get_dispatcher(powerunit_info)
Simran Basi7498d202012-07-10 15:21:28 -0700179 if not dispatcher_uri:
180 # No dispatchers available.
181 raise RPMInfrastructureException('No dispatchers available.')
Fang Dengf63c9782014-08-13 17:08:19 -0700182 client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700183 try:
184 # Block on the request and return the result once it arrives.
Fang Dengf63c9782014-08-13 17:08:19 -0700185 return client.queue_request(powerunit_info, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700186 except socket.error as er:
187 # Dispatcher Server is not reachable. Unregister it and retry.
188 logging.error("Can't reach Dispatch Server: %s. Error: %s",
189 dispatcher_uri, errno.errorcode[er.errno])
190 if self.is_network_infrastructure_down():
191 # No dispatchers can handle this request so raise an Exception
192 # to the caller.
193 raise RPMInfrastructureException('No dispatchers can be'
194 'reached.')
195 logging.info('Will attempt forwarding request to other dispatch '
196 'servers.')
197 logging.error('Unregistering %s due to error. Recommend resetting '
198 'that dispatch server.', dispatcher_uri)
199 self.unregister_dispatcher(dispatcher_uri)
Prathmesh Prabhubc41d1c2018-07-12 17:50:49 -0700200 raise DispatcherDownException(dispatcher_uri)
Simran Basi7498d202012-07-10 15:21:28 -0700201
202
203 def is_network_infrastructure_down(self):
204 """
205 Check to see if we can communicate with any dispatcher servers.
206
207 Only called in the situation that queuing a request to a dispatcher
208 server failed.
209
210 @return: False if any dispatcher server is up and the rpm infrastructure
211 can still function. True otherwise.
212 """
213 for dispatcher_entry in self._dispatcher_minheap:
Fang Dengf63c9782014-08-13 17:08:19 -0700214 dispatcher = xmlrpclib.ServerProxy(
215 dispatcher_entry[DISPATCHER_URI], allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700216 try:
217 if dispatcher.is_up():
218 # Atleast one dispatcher is alive so our network is fine.
219 return False
220 except socket.error:
221 # Can't talk to this dispatcher so keep looping.
222 pass
223 logging.error("Can't reach any dispatchers. Check frontend network "
224 'status or all dispatchers are down.')
225 return True
226
227
Fang Dengf63c9782014-08-13 17:08:19 -0700228 def _get_powerunit_info(self, device_hostname):
229 """Get the power management unit information for a device.
230
231 A device could be a chromeos dut or a servo.
232 1) ChromeOS dut
233 Chromeos dut is managed by RPM. The related information
234 we need to know include rpm hostname, rpm outlet, hydra hostname.
235 Such information can be retrieved from afe_host_attributes table
236 from afe. A local LRU cache is used avoid hitting afe too often.
237
238 2) Servo
239 Servo is managed by POE. The related information we need to know
240 include poe hostname, poe interface. Such information is
241 stored in a local file and read into memory.
242
243 @param device_hostname: A string representing the device's hostname.
244
245 @returns: A PowerUnitInfo object.
246 @raises RPMInfrastructureException if failed to get the power
247 unit info.
248
249 """
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700250 if device_hostname.endswith('servo'):
251 return self._get_poe_powerunit_info(device_hostname)
252 else:
253 return self._get_rpm_powerunit_info(device_hostname)
254
255
256 def _get_poe_powerunit_info(self, device_hostname):
257 """Get the power management unit information for a POE controller.
258
259 Servo is managed by POE. The related information we need to know
260 include poe hostname, poe interface. Such information is
261 stored in a local file and read into memory.
262
263 @param device_hostname: A string representing the device's hostname.
264
265 @returns: A PowerUnitInfo object.
266 @raises RPMInfrastructureException if failed to get the power
267 unit info.
268
269 """
Fang Dengf63c9782014-08-13 17:08:19 -0700270 with self._lock:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700271 reload_info = utils.reload_servo_interface_mapping_if_necessary(
272 self._mapping_last_modified)
273 if reload_info:
274 self._mapping_last_modified, self._servo_interface = reload_info
275 switch_if_tuple = self._servo_interface.get(device_hostname)
276 if not switch_if_tuple:
277 raise RPMInfrastructureException(
278 'Could not determine POE hostname for %s. '
279 'Please check the servo-interface mapping file.',
280 device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700281 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700282 return utils.PowerUnitInfo(
283 device_hostname=device_hostname,
284 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
285 powerunit_hostname=switch_if_tuple[0],
286 outlet=switch_if_tuple[1],
287 hydra_hostname=None)
288
289
290
291 def _get_rpm_powerunit_info(self, device_hostname):
292 """Get the power management unit information for an RPM controller.
293
294 Chromeos dut is managed by RPM. The related information
295 we need to know include rpm hostname, rpm outlet, hydra hostname.
296 Such information can be retrieved from afe_host_attributes table
297 from afe. A local LRU cache is used avoid hitting afe too often.
298
299 @param device_hostname: A string representing the device's hostname.
300
301 @returns: A PowerUnitInfo object.
302 @raises RPMInfrastructureException if failed to get the power
303 unit info.
304
305 """
306 with self._lock:
307 # Regular DUTs are managed by RPMs.
308 if device_hostname in self._rpm_info:
309 return self._rpm_info[device_hostname]
310 else:
311 hosts = self._afe.get_hosts(hostname=device_hostname)
312 if not hosts:
313 raise RPMInfrastructureException(
314 'Can not retrieve rpm information '
315 'from AFE for %s, no host found.' % device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700316 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700317 info = utils.PowerUnitInfo.get_powerunit_info(hosts[0])
318 self._rpm_info[device_hostname] = info
319 return info
320
Fang Dengf63c9782014-08-13 17:08:19 -0700321
322
323 def _get_dispatcher(self, powerunit_info):
Simran Basi7498d202012-07-10 15:21:28 -0700324 """
325 Private method that looks up or assigns a dispatcher server
Fang Dengf63c9782014-08-13 17:08:19 -0700326 responsible for communicating with the given RPM/POE.
Simran Basi7498d202012-07-10 15:21:28 -0700327
328 Will also call _check_dispatcher to make sure it is up before returning
329 it.
330
Fang Dengf63c9782014-08-13 17:08:19 -0700331 @param powerunit_info: A PowerUnitInfo instance.
Simran Basi7498d202012-07-10 15:21:28 -0700332
Fang Dengf63c9782014-08-13 17:08:19 -0700333 @return: URI of dispatcher server responsible for the rpm/poe.
334 None if no dispatcher servers are available.
Simran Basi7498d202012-07-10 15:21:28 -0700335 """
Fang Dengf63c9782014-08-13 17:08:19 -0700336 powerunit_type = powerunit_info.powerunit_type
337 powerunit_hostname = powerunit_info.powerunit_hostname
Simran Basi7498d202012-07-10 15:21:28 -0700338 with self._lock:
Fang Dengf63c9782014-08-13 17:08:19 -0700339 if self._rpm_dict.get(powerunit_hostname):
340 return self._rpm_dict[powerunit_hostname]
341 logging.info('No Dispatcher assigned for %s %s.',
342 powerunit_type, powerunit_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700343 # Choose the least loaded dispatcher to communicate with the RPM.
344 try:
345 heap_entry = heapq.heappop(self._dispatcher_minheap)
346 except IndexError:
347 logging.error('Infrastructure Error: Frontend has no'
348 'registered dispatchers to field out this '
349 'request!')
350 return None
351 dispatcher_uri = heap_entry[DISPATCHER_URI]
352 # Put this entry back in the heap with an RPM Count + 1.
353 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
354 heapq.heappush(self._dispatcher_minheap, heap_entry)
Fang Dengf63c9782014-08-13 17:08:19 -0700355 logging.info('Assigning %s for %s %s', dispatcher_uri,
356 powerunit_type, powerunit_hostname)
357 self._rpm_dict[powerunit_hostname] = dispatcher_uri
Simran Basi7498d202012-07-10 15:21:28 -0700358 return dispatcher_uri
359
360
361 def register_dispatcher(self, dispatcher_uri):
362 """
363 Called by a dispatcher server so that the frontend server knows it is
364 available to field out RPM requests.
365
366 Adds an entry to the min heap and entry map for this dispatcher.
367
368 @param dispatcher_uri: Address of dispatcher server we are registering.
369 """
370 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
371 with self._lock:
372 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
373 heapq.heappush(self._dispatcher_minheap, heap_entry)
374 self._entry_dict[dispatcher_uri] = heap_entry
375
376
377 def unregister_dispatcher(self, uri_to_unregister):
378 """
379 Called by a dispatcher server as it exits so that the frontend server
380 knows that it is no longer available to field out requests.
381
382 Assigns an rpm count of -1 to this dispatcher so that it will be pushed
383 out of the min heap.
384
385 Removes from _rpm_dict all entries with the value of this dispatcher so
386 that those RPM's can be reassigned to a new dispatcher.
387
388 @param uri_to_unregister: Address of dispatcher server we are
389 unregistering.
390 """
391 logging.info('Unregistering uri: %s as a rpm dispatcher.',
392 uri_to_unregister)
393 with self._lock:
394 heap_entry = self._entry_dict.get(uri_to_unregister)
395 if not heap_entry:
396 logging.warning('%s was not registered.', uri_to_unregister)
397 return
398 # Set this entry's RPM_COUNT to TERMINATED (-1).
399 heap_entry[RPM_COUNT] = TERMINATED
400 # Remove all RPM mappings.
401 for rpm, dispatcher in self._rpm_dict.items():
402 if dispatcher == uri_to_unregister:
403 self._rpm_dict[rpm] = None
404 self._entry_dict[uri_to_unregister] = None
405 # Re-sort the heap and remove any terminated dispatchers.
406 heapq.heapify(self._dispatcher_minheap)
407 self._remove_terminated_dispatchers()
408
409
410 def _remove_terminated_dispatchers(self):
411 """
412 Peek at the head of the heap and keep popping off values until there is
413 a non-terminated dispatcher at the top.
414 """
415 # Heapq guarantees the head of the heap is in the '0' index.
416 try:
417 # Peek at the next element in the heap.
418 top_of_heap = self._dispatcher_minheap[0]
419 while top_of_heap[RPM_COUNT] is TERMINATED:
420 # Pop off the top element.
421 heapq.heappop(self._dispatcher_minheap)
422 # Peek at the next element in the heap.
423 top_of_heap = self._dispatcher_minheap[0]
424 except IndexError:
425 # No more values in the heap. Can be thrown by both minheap[0]
426 # statements.
427 pass
428
429
Simran Basi4e3d1182013-06-25 16:12:30 -0700430 def suspend_emails(self, hours):
431 """Suspend email notifications.
432
433 @param hours: How many hours to suspend email notifications.
434 """
435 if self._email_handler:
436 self._email_handler.suspend_emails(hours)
437
438
439 def resume_emails(self):
440 """Resume email notifications."""
441 if self._email_handler:
442 self._email_handler.resume_emails()
443
444
Simran Basi7498d202012-07-10 15:21:28 -0700445if __name__ == '__main__':
446 """
447 Main function used to launch the frontend server. Creates an instance of
448 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
449 """
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700450 if len(sys.argv) != 2:
451 print 'Usage: ./%s <log_file_dir>.' % sys.argv[0]
Scott Zawalski201d6be2012-09-21 15:56:25 -0400452 sys.exit(1)
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700453
454 email_handler = rpm_logging_config.set_up_logging_to_file(
455 sys.argv[1], LOG_FILENAME_FORMAT)
Simran Basi4e3d1182013-06-25 16:12:30 -0700456 frontend_server = RPMFrontendServer(email_handler=email_handler)
Prathmesh Prabhue315a962017-09-20 14:49:49 -0700457 # We assume that external clients will always connect to us via the
458 # hostname, so listening on the hostname ensures we pick the right network
459 # interface.
460 address = socket.gethostname()
Simran Basi7498d202012-07-10 15:21:28 -0700461 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
462 server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
463 server.register_instance(frontend_server)
464 logging.info('Listening on %s port %d', address, port)
465 server.serve_forever()