blob: 794c2b6f7893aebf9a796f7394a5b910fb32e0a5 [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Scott Zawalski201d6be2012-09-21 15:56:25 -040010import sys
Simran Basi7498d202012-07-10 15:21:28 -070011import socket
12import threading
13import xmlrpclib
14
Fang Deng71c4b1f2013-05-20 09:55:04 -070015import rpm_logging_config
Simran Basi7498d202012-07-10 15:21:28 -070016from config import rpm_config
17from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
18from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070019
Dan Shib7610b52014-05-06 11:09:49 -070020import common
Fang Dengf63c9782014-08-13 17:08:19 -070021from autotest_lib.server import frontend
Dan Shib7610b52014-05-06 11:09:49 -070022from autotest_lib.site_utils.rpm_control_system import utils
23
Simran Basi7498d202012-07-10 15:21:28 -070024DEFAULT_RPM_COUNT = 0
25TERMINATED = -1
26
27# Indexes for accessing heap entries.
28RPM_COUNT = 0
29DISPATCHER_URI = 1
30
31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
33
34# Valid state values.
35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
36
Fang Deng71c4b1f2013-05-20 09:55:04 -070037# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070038MAPPING_FILE = os.path.join(
39 os.path.dirname(__file__),
40 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070041
Fang Dengf63c9782014-08-13 17:08:19 -070042# Size of the LRU that holds power management unit information related
43# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
44LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
45
Simran Basi7498d202012-07-10 15:21:28 -070046
47class RPMFrontendServer(object):
48 """
49 This class is the frontend server of the RPM Infrastructure. All clients
Fang Dengf63c9782014-08-13 17:08:19 -070050 will send their power state requests to this central server who will
Simran Basi7498d202012-07-10 15:21:28 -070051 forward the requests to an avaliable or already assigned RPM dispatcher
52 server.
53
54 Once the dispatcher processes the request it will return the result
55 to this frontend server who will send the result back to the client.
56
57 All calls to this server are blocking.
58
59 @var _dispatcher_minheap: Min heap that returns a list of format-
60 [ num_rpm's, dispatcher_uri ]
61 Used to choose the least loaded dispatcher.
Fang Deng71c4b1f2013-05-20 09:55:04 -070062 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
Simran Basi7498d202012-07-10 15:21:28 -070063 heap. If a dispatcher server shuts down this allows us to
64 invalidate the entry in the minheap.
65 @var _lock: Used to protect data from multiple running threads all
66 manipulating the same data.
67 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
68 server.
Fang Deng71c4b1f2013-05-20 09:55:04 -070069 @var _mapping_last_modified: Last-modified time of the servo-interface
70 mapping file.
71 @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
Fang Dengf63c9782014-08-13 17:08:19 -070072 @var _rpm_info: An LRU cache to hold recently visited rpm information
73 so that we don't hit AFE too often. The elements in
74 the cache are instances of PowerUnitInfo indexed by
75 dut hostnames. POE info is not stored in the cache.
76 @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
Simran Basi4e3d1182013-06-25 16:12:30 -070077 @var _email_handler: Email handler to use to control email notifications.
Simran Basi7498d202012-07-10 15:21:28 -070078 """
79
80
Simran Basi4e3d1182013-06-25 16:12:30 -070081 def __init__(self, email_handler=None):
Simran Basi7498d202012-07-10 15:21:28 -070082 """
83 RPMFrontendServer constructor.
84
85 Initializes instance variables.
86 """
87 self._dispatcher_minheap = []
88 self._entry_dict = {}
89 self._lock = threading.Lock()
Fang Deng71c4b1f2013-05-20 09:55:04 -070090 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
91 self._servo_interface = utils.load_servo_interface_mapping()
Fang Dengf63c9782014-08-13 17:08:19 -070092 self._rpm_dict = {}
93 self._afe = frontend.AFE()
94 self._rpm_info = utils.LRUCache(size=LRU_SIZE)
Simran Basi4e3d1182013-06-25 16:12:30 -070095 self._email_handler = email_handler
Simran Basi7498d202012-07-10 15:21:28 -070096
97
Fang Dengf63c9782014-08-13 17:08:19 -070098 def queue_request(self, device_hostname, new_state):
Simran Basi7498d202012-07-10 15:21:28 -070099 """
Fang Dengf63c9782014-08-13 17:08:19 -0700100 Forwards a request to change a device's (a dut or a servo) power state
101 to the appropriate dispatcher server.
Simran Basi7498d202012-07-10 15:21:28 -0700102
103 This call will block until the forwarded request returns.
104
Fang Dengf63c9782014-08-13 17:08:19 -0700105 @param device_hostname: Hostname of the device whose power state we want to
Simran Basi7498d202012-07-10 15:21:28 -0700106 change.
107 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
Fang Dengf63c9782014-08-13 17:08:19 -0700108 device's outlet to.
Simran Basi7498d202012-07-10 15:21:28 -0700109
110 @return: True if the attempt to change power state was successful,
111 False otherwise.
112
113 @raise RPMInfrastructureException: No dispatchers are available or can
114 be reached.
115 """
Fang Dengf63c9782014-08-13 17:08:19 -0700116 # Remove any DNS Zone information and simplify down to just the hostname.
117 device_hostname = device_hostname.split('.')[0]
Simran Basi7498d202012-07-10 15:21:28 -0700118 new_state = new_state.upper()
Simran Basid1222642012-09-24 15:31:58 -0700119 # Put new_state in all uppercase letters
Simran Basi7498d202012-07-10 15:21:28 -0700120 if new_state not in VALID_STATE_VALUES:
Fang Dengf63c9782014-08-13 17:08:19 -0700121 logging.error('Received request to set device %s to invalid '
122 'state %s', device_hostname, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700123 return False
Fang Dengf63c9782014-08-13 17:08:19 -0700124 logging.info('Received request to set device: %s to state: %s',
125 device_hostname, new_state)
126 powerunit_info = self._get_powerunit_info(device_hostname)
127 dispatcher_uri = self._get_dispatcher(powerunit_info)
Simran Basi7498d202012-07-10 15:21:28 -0700128 if not dispatcher_uri:
129 # No dispatchers available.
130 raise RPMInfrastructureException('No dispatchers available.')
Fang Dengf63c9782014-08-13 17:08:19 -0700131 client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700132 try:
133 # Block on the request and return the result once it arrives.
Fang Dengf63c9782014-08-13 17:08:19 -0700134 return client.queue_request(powerunit_info, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700135 except socket.error as er:
136 # Dispatcher Server is not reachable. Unregister it and retry.
137 logging.error("Can't reach Dispatch Server: %s. Error: %s",
138 dispatcher_uri, errno.errorcode[er.errno])
139 if self.is_network_infrastructure_down():
140 # No dispatchers can handle this request so raise an Exception
141 # to the caller.
142 raise RPMInfrastructureException('No dispatchers can be'
143 'reached.')
144 logging.info('Will attempt forwarding request to other dispatch '
145 'servers.')
146 logging.error('Unregistering %s due to error. Recommend resetting '
147 'that dispatch server.', dispatcher_uri)
148 self.unregister_dispatcher(dispatcher_uri)
149 # Retry forwarding the request.
Fang Dengf63c9782014-08-13 17:08:19 -0700150 return self.queue_request(device_hostname, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700151
152
153 def is_network_infrastructure_down(self):
154 """
155 Check to see if we can communicate with any dispatcher servers.
156
157 Only called in the situation that queuing a request to a dispatcher
158 server failed.
159
160 @return: False if any dispatcher server is up and the rpm infrastructure
161 can still function. True otherwise.
162 """
163 for dispatcher_entry in self._dispatcher_minheap:
Fang Dengf63c9782014-08-13 17:08:19 -0700164 dispatcher = xmlrpclib.ServerProxy(
165 dispatcher_entry[DISPATCHER_URI], allow_none=True)
Simran Basi7498d202012-07-10 15:21:28 -0700166 try:
167 if dispatcher.is_up():
168 # Atleast one dispatcher is alive so our network is fine.
169 return False
170 except socket.error:
171 # Can't talk to this dispatcher so keep looping.
172 pass
173 logging.error("Can't reach any dispatchers. Check frontend network "
174 'status or all dispatchers are down.')
175 return True
176
177
Fang Dengf63c9782014-08-13 17:08:19 -0700178 def _get_powerunit_info(self, device_hostname):
179 """Get the power management unit information for a device.
180
181 A device could be a chromeos dut or a servo.
182 1) ChromeOS dut
183 Chromeos dut is managed by RPM. The related information
184 we need to know include rpm hostname, rpm outlet, hydra hostname.
185 Such information can be retrieved from afe_host_attributes table
186 from afe. A local LRU cache is used avoid hitting afe too often.
187
188 2) Servo
189 Servo is managed by POE. The related information we need to know
190 include poe hostname, poe interface. Such information is
191 stored in a local file and read into memory.
192
193 @param device_hostname: A string representing the device's hostname.
194
195 @returns: A PowerUnitInfo object.
196 @raises RPMInfrastructureException if failed to get the power
197 unit info.
198
199 """
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700200 if device_hostname.endswith('servo'):
201 return self._get_poe_powerunit_info(device_hostname)
202 else:
203 return self._get_rpm_powerunit_info(device_hostname)
204
205
206 def _get_poe_powerunit_info(self, device_hostname):
207 """Get the power management unit information for a POE controller.
208
209 Servo is managed by POE. The related information we need to know
210 include poe hostname, poe interface. Such information is
211 stored in a local file and read into memory.
212
213 @param device_hostname: A string representing the device's hostname.
214
215 @returns: A PowerUnitInfo object.
216 @raises RPMInfrastructureException if failed to get the power
217 unit info.
218
219 """
Fang Dengf63c9782014-08-13 17:08:19 -0700220 with self._lock:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700221 reload_info = utils.reload_servo_interface_mapping_if_necessary(
222 self._mapping_last_modified)
223 if reload_info:
224 self._mapping_last_modified, self._servo_interface = reload_info
225 switch_if_tuple = self._servo_interface.get(device_hostname)
226 if not switch_if_tuple:
227 raise RPMInfrastructureException(
228 'Could not determine POE hostname for %s. '
229 'Please check the servo-interface mapping file.',
230 device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700231 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700232 return utils.PowerUnitInfo(
233 device_hostname=device_hostname,
234 powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
235 powerunit_hostname=switch_if_tuple[0],
236 outlet=switch_if_tuple[1],
237 hydra_hostname=None)
238
239
240
241 def _get_rpm_powerunit_info(self, device_hostname):
242 """Get the power management unit information for an RPM controller.
243
244 Chromeos dut is managed by RPM. The related information
245 we need to know include rpm hostname, rpm outlet, hydra hostname.
246 Such information can be retrieved from afe_host_attributes table
247 from afe. A local LRU cache is used avoid hitting afe too often.
248
249 @param device_hostname: A string representing the device's hostname.
250
251 @returns: A PowerUnitInfo object.
252 @raises RPMInfrastructureException if failed to get the power
253 unit info.
254
255 """
256 with self._lock:
257 # Regular DUTs are managed by RPMs.
258 if device_hostname in self._rpm_info:
259 return self._rpm_info[device_hostname]
260 else:
261 hosts = self._afe.get_hosts(hostname=device_hostname)
262 if not hosts:
263 raise RPMInfrastructureException(
264 'Can not retrieve rpm information '
265 'from AFE for %s, no host found.' % device_hostname)
Fang Dengf63c9782014-08-13 17:08:19 -0700266 else:
Prathmesh Prabhuc32b4932018-07-12 17:43:30 -0700267 info = utils.PowerUnitInfo.get_powerunit_info(hosts[0])
268 self._rpm_info[device_hostname] = info
269 return info
270
Fang Dengf63c9782014-08-13 17:08:19 -0700271
272
273 def _get_dispatcher(self, powerunit_info):
Simran Basi7498d202012-07-10 15:21:28 -0700274 """
275 Private method that looks up or assigns a dispatcher server
Fang Dengf63c9782014-08-13 17:08:19 -0700276 responsible for communicating with the given RPM/POE.
Simran Basi7498d202012-07-10 15:21:28 -0700277
278 Will also call _check_dispatcher to make sure it is up before returning
279 it.
280
Fang Dengf63c9782014-08-13 17:08:19 -0700281 @param powerunit_info: A PowerUnitInfo instance.
Simran Basi7498d202012-07-10 15:21:28 -0700282
Fang Dengf63c9782014-08-13 17:08:19 -0700283 @return: URI of dispatcher server responsible for the rpm/poe.
284 None if no dispatcher servers are available.
Simran Basi7498d202012-07-10 15:21:28 -0700285 """
Fang Dengf63c9782014-08-13 17:08:19 -0700286 powerunit_type = powerunit_info.powerunit_type
287 powerunit_hostname = powerunit_info.powerunit_hostname
Simran Basi7498d202012-07-10 15:21:28 -0700288 with self._lock:
Fang Dengf63c9782014-08-13 17:08:19 -0700289 if self._rpm_dict.get(powerunit_hostname):
290 return self._rpm_dict[powerunit_hostname]
291 logging.info('No Dispatcher assigned for %s %s.',
292 powerunit_type, powerunit_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700293 # Choose the least loaded dispatcher to communicate with the RPM.
294 try:
295 heap_entry = heapq.heappop(self._dispatcher_minheap)
296 except IndexError:
297 logging.error('Infrastructure Error: Frontend has no'
298 'registered dispatchers to field out this '
299 'request!')
300 return None
301 dispatcher_uri = heap_entry[DISPATCHER_URI]
302 # Put this entry back in the heap with an RPM Count + 1.
303 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
304 heapq.heappush(self._dispatcher_minheap, heap_entry)
Fang Dengf63c9782014-08-13 17:08:19 -0700305 logging.info('Assigning %s for %s %s', dispatcher_uri,
306 powerunit_type, powerunit_hostname)
307 self._rpm_dict[powerunit_hostname] = dispatcher_uri
Simran Basi7498d202012-07-10 15:21:28 -0700308 return dispatcher_uri
309
310
311 def register_dispatcher(self, dispatcher_uri):
312 """
313 Called by a dispatcher server so that the frontend server knows it is
314 available to field out RPM requests.
315
316 Adds an entry to the min heap and entry map for this dispatcher.
317
318 @param dispatcher_uri: Address of dispatcher server we are registering.
319 """
320 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
321 with self._lock:
322 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
323 heapq.heappush(self._dispatcher_minheap, heap_entry)
324 self._entry_dict[dispatcher_uri] = heap_entry
325
326
327 def unregister_dispatcher(self, uri_to_unregister):
328 """
329 Called by a dispatcher server as it exits so that the frontend server
330 knows that it is no longer available to field out requests.
331
332 Assigns an rpm count of -1 to this dispatcher so that it will be pushed
333 out of the min heap.
334
335 Removes from _rpm_dict all entries with the value of this dispatcher so
336 that those RPM's can be reassigned to a new dispatcher.
337
338 @param uri_to_unregister: Address of dispatcher server we are
339 unregistering.
340 """
341 logging.info('Unregistering uri: %s as a rpm dispatcher.',
342 uri_to_unregister)
343 with self._lock:
344 heap_entry = self._entry_dict.get(uri_to_unregister)
345 if not heap_entry:
346 logging.warning('%s was not registered.', uri_to_unregister)
347 return
348 # Set this entry's RPM_COUNT to TERMINATED (-1).
349 heap_entry[RPM_COUNT] = TERMINATED
350 # Remove all RPM mappings.
351 for rpm, dispatcher in self._rpm_dict.items():
352 if dispatcher == uri_to_unregister:
353 self._rpm_dict[rpm] = None
354 self._entry_dict[uri_to_unregister] = None
355 # Re-sort the heap and remove any terminated dispatchers.
356 heapq.heapify(self._dispatcher_minheap)
357 self._remove_terminated_dispatchers()
358
359
360 def _remove_terminated_dispatchers(self):
361 """
362 Peek at the head of the heap and keep popping off values until there is
363 a non-terminated dispatcher at the top.
364 """
365 # Heapq guarantees the head of the heap is in the '0' index.
366 try:
367 # Peek at the next element in the heap.
368 top_of_heap = self._dispatcher_minheap[0]
369 while top_of_heap[RPM_COUNT] is TERMINATED:
370 # Pop off the top element.
371 heapq.heappop(self._dispatcher_minheap)
372 # Peek at the next element in the heap.
373 top_of_heap = self._dispatcher_minheap[0]
374 except IndexError:
375 # No more values in the heap. Can be thrown by both minheap[0]
376 # statements.
377 pass
378
379
Simran Basi4e3d1182013-06-25 16:12:30 -0700380 def suspend_emails(self, hours):
381 """Suspend email notifications.
382
383 @param hours: How many hours to suspend email notifications.
384 """
385 if self._email_handler:
386 self._email_handler.suspend_emails(hours)
387
388
389 def resume_emails(self):
390 """Resume email notifications."""
391 if self._email_handler:
392 self._email_handler.resume_emails()
393
394
Simran Basi7498d202012-07-10 15:21:28 -0700395if __name__ == '__main__':
396 """
397 Main function used to launch the frontend server. Creates an instance of
398 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
399 """
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700400 if len(sys.argv) != 2:
401 print 'Usage: ./%s <log_file_dir>.' % sys.argv[0]
Scott Zawalski201d6be2012-09-21 15:56:25 -0400402 sys.exit(1)
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700403
404 email_handler = rpm_logging_config.set_up_logging_to_file(
405 sys.argv[1], LOG_FILENAME_FORMAT)
Simran Basi4e3d1182013-06-25 16:12:30 -0700406 frontend_server = RPMFrontendServer(email_handler=email_handler)
Prathmesh Prabhue315a962017-09-20 14:49:49 -0700407 # We assume that external clients will always connect to us via the
408 # hostname, so listening on the hostname ensures we pick the right network
409 # interface.
410 address = socket.gethostname()
Simran Basi7498d202012-07-10 15:21:28 -0700411 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
412 server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
413 server.register_instance(frontend_server)
414 logging.info('Listening on %s port %d', address, port)
415 server.serve_forever()