[autotest] rpm frontend server retrieves rpm/outlet/hydra from afe
This is the 2nd CL to make rpm infra read rpm/outlet/hydra info from
afe.
The major change of this cl is to add a new method
_get_powerunit_info(...)
For chromeos duts, it first looks up in its local lru cache for
rpm/outlet/hydra information for the dut. If not found in
local cache, retrieve the information from afe.
For servos, it loads the poe/outlet/hydra info from a local file.
The method returns an instance of PowerUnitInfo instance, which
will be passed to rpm_dispatcher.
CQ-DEPEND=CL:212349
BUG=chromium:392548
TEST=unittest.
Integration test with other cls in this series, set up
rpm server locally and change host states.
Change-Id: I63326f8b58083ebf807105d94c919bf5fd5f7cea
Reviewed-on: https://chromium-review.googlesource.com/212346
Reviewed-by: Simran Basi <sbasi@chromium.org>
Reviewed-by: Dan Shi <dshi@chromium.org>
Tested-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Fang Deng <fdeng@chromium.org>
diff --git a/site_utils/rpm_control_system/frontend_server.py b/site_utils/rpm_control_system/frontend_server.py
index ef6b930..6ec33d4 100755
--- a/site_utils/rpm_control_system/frontend_server.py
+++ b/site_utils/rpm_control_system/frontend_server.py
@@ -7,7 +7,6 @@
import heapq
import logging
import os
-import re
import sys
import socket
import threading
@@ -19,6 +18,7 @@
from rpm_infrastructure_exception import RPMInfrastructureException
import common
+from autotest_lib.server import frontend
from autotest_lib.site_utils.rpm_control_system import utils
DEFAULT_RPM_COUNT = 0
@@ -34,19 +34,20 @@
# Valid state values.
VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
-# RPM Hostname regex.
-RPM_REGEX = re.compile('host[^.]*')
-
# Servo-interface mapping file
MAPPING_FILE = os.path.join(
os.path.dirname(__file__),
rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
+# Size of the LRU that holds power management unit information related
+# to a device, e.g. rpm_hostname, outlet, hydra_hostname, etc.
+LRU_SIZE = rpm_config.getint('RPM_INFRASTRUCTURE', 'lru_size')
+
class RPMFrontendServer(object):
"""
This class is the frontend server of the RPM Infrastructure. All clients
- will send their DUT power state requests to this central server who will
+ will send their power state requests to this central server who will
forward the requests to an avaliable or already assigned RPM dispatcher
server.
@@ -68,6 +69,11 @@
@var _mapping_last_modified: Last-modified time of the servo-interface
mapping file.
@var _servo_interface: Maps servo hostname to (switch_hostname, interface).
+ @var _rpm_info: An LRU cache to hold recently visited rpm information
+ so that we don't hit AFE too often. The elements in
+ the cache are instances of PowerUnitInfo indexed by
+ dut hostnames. POE info is not stored in the cache.
+ @var _afe: AFE instance to talk to autotest. Used to retrieve rpm hostname.
@var _email_handler: Email handler to use to control email notifications.
"""
@@ -81,23 +87,25 @@
self._dispatcher_minheap = []
self._entry_dict = {}
self._lock = threading.Lock()
- self._rpm_dict = {}
self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
self._servo_interface = utils.load_servo_interface_mapping()
+ self._rpm_dict = {}
+ self._afe = frontend.AFE()
+ self._rpm_info = utils.LRUCache(size=LRU_SIZE)
self._email_handler = email_handler
- def queue_request(self, dut_hostname, new_state):
+ def queue_request(self, device_hostname, new_state):
"""
- Forwards a request to change a DUT's power state to the appropriate
- dispatcher server.
+ Forwards a request to change a device's (a dut or a servo) power state
+ to the appropriate dispatcher server.
This call will block until the forwarded request returns.
- @param dut_hostname: Hostname of the DUT whose power state we want to
+ @param device_hostname: Hostname of the device whose power state we want to
change.
@param new_state: [ON, OFF, CYCLE] State to which we want to set the
- DUT's outlet to.
+ device's outlet to.
@return: True if the attempt to change power state was successful,
False otherwise.
@@ -105,25 +113,25 @@
@raise RPMInfrastructureException: No dispatchers are available or can
be reached.
"""
- # Remove any DNS Zone information and simplify down to just the dut
- # hostname.
- dut_hostname = dut_hostname.split('.')[0]
+ # Remove any DNS Zone information and simplify down to just the hostname.
+ device_hostname = device_hostname.split('.')[0]
new_state = new_state.upper()
# Put new_state in all uppercase letters
if new_state not in VALID_STATE_VALUES:
- logging.error('Received request to set DUT: %s to invalid state %s',
- dut_hostname, new_state)
+ logging.error('Received request to set device %s to invalid '
+ 'state %s', device_hostname, new_state)
return False
- logging.info('Received request to set DUT: %s to state: %s',
- dut_hostname, new_state)
- dispatcher_uri = self._get_dispatcher(dut_hostname)
+ logging.info('Received request to set device: %s to state: %s',
+ device_hostname, new_state)
+ powerunit_info = self._get_powerunit_info(device_hostname)
+ dispatcher_uri = self._get_dispatcher(powerunit_info)
if not dispatcher_uri:
# No dispatchers available.
raise RPMInfrastructureException('No dispatchers available.')
- client = xmlrpclib.ServerProxy(dispatcher_uri)
+ client = xmlrpclib.ServerProxy(dispatcher_uri, allow_none=True)
try:
# Block on the request and return the result once it arrives.
- return client.queue_request(dut_hostname, new_state)
+ return client.queue_request(powerunit_info, new_state)
except socket.error as er:
# Dispatcher Server is not reachable. Unregister it and retry.
logging.error("Can't reach Dispatch Server: %s. Error: %s",
@@ -139,7 +147,7 @@
'that dispatch server.', dispatcher_uri)
self.unregister_dispatcher(dispatcher_uri)
# Retry forwarding the request.
- return self.queue_request(dut_hostname, new_state)
+ return self.queue_request(device_hostname, new_state)
def is_network_infrastructure_down(self):
@@ -153,7 +161,8 @@
can still function. True otherwise.
"""
for dispatcher_entry in self._dispatcher_minheap:
- dispatcher = xmlrpclib.ServerProxy(dispatcher_entry[DISPATCHER_URI])
+ dispatcher = xmlrpclib.ServerProxy(
+ dispatcher_entry[DISPATCHER_URI], allow_none=True)
try:
if dispatcher.is_up():
# Atleast one dispatcher is alive so our network is fine.
@@ -166,41 +175,84 @@
return True
- def _get_dispatcher(self, dut_hostname):
+ def _get_powerunit_info(self, device_hostname):
+ """Get the power management unit information for a device.
+
+ A device could be a chromeos dut or a servo.
+ 1) ChromeOS dut
+ Chromeos dut is managed by RPM. The related information
+ we need to know include rpm hostname, rpm outlet, hydra hostname.
+ Such information can be retrieved from afe_host_attributes table
+ from afe. A local LRU cache is used avoid hitting afe too often.
+
+ 2) Servo
+ Servo is managed by POE. The related information we need to know
+ include poe hostname, poe interface. Such information is
+ stored in a local file and read into memory.
+
+ @param device_hostname: A string representing the device's hostname.
+
+ @returns: A PowerUnitInfo object.
+ @raises RPMInfrastructureException if failed to get the power
+ unit info.
+
+ """
+ with self._lock:
+ if device_hostname.endswith('servo'):
+ # Servos are managed by Cisco POE switches.
+ reload_info = utils.reload_servo_interface_mapping_if_necessary(
+ self._mapping_last_modified)
+ if reload_info:
+ self._mapping_last_modified, self._servo_interface = reload_info
+ switch_if_tuple = self._servo_interface.get(device_hostname)
+ if not switch_if_tuple:
+ raise RPMInfrastructureException(
+ 'Could not determine POE hostname for %s. '
+ 'Please check the servo-interface mapping file.',
+ device_hostname)
+ else:
+ return utils.PowerUnitInfo(
+ device_hostname=device_hostname,
+ powerunit_type=utils.PowerUnitInfo.POWERUNIT_TYPES.POE,
+ powerunit_hostname=switch_if_tuple[0],
+ outlet=switch_if_tuple[1],
+ hydra_hostname=None)
+ else:
+ # Regular DUTs are managed by RPMs.
+ if device_hostname in self._rpm_info:
+ return self._rpm_info[device_hostname]
+ else:
+ hosts = self._afe.get_hosts(hostname=device_hostname)
+ if not hosts:
+ raise RPMInfrastructureException(
+ 'Can not retrieve rpm information '
+ 'from AFE for %s, no host found.' % device_hostname)
+ else:
+ info = utils.PowerUnitInfo.get_powerunit_info(hosts[0])
+ self._rpm_info[device_hostname] = info
+ return info
+
+
+ def _get_dispatcher(self, powerunit_info):
"""
Private method that looks up or assigns a dispatcher server
- responsible for communicating with the given DUT's RPM.
+ responsible for communicating with the given RPM/POE.
Will also call _check_dispatcher to make sure it is up before returning
it.
- @param dut_hostname: Hostname of the DUT whose dispatcher URI we want
- to retrieve.
+ @param powerunit_info: A PowerUnitInfo instance.
- @return: URI of dispatcher server responsible for this DUT's rpm. None
- if no dispatcher servers are available.
+ @return: URI of dispatcher server responsible for the rpm/poe.
+ None if no dispatcher servers are available.
"""
- if dut_hostname.endswith('servo'):
- # Servos are managed by Cisco POE switches.
- reload_info = utils.reload_servo_interface_mapping_if_necessary(
- self._mapping_last_modified)
- if reload_info:
- self._mapping_last_modified, self._servo_interface = reload_info
- switch_if_tuple = self._servo_interface.get(dut_hostname)
- if not switch_if_tuple:
- logging.error('Could not determine POE hostname for %s. '
- 'Please check the servo-interface mapping file.',
- dut_hostname)
- return None
- else:
- rpm_hostname = switch_if_tuple[0]
- else:
- # Regular DUTs are managed by RPMs.
- rpm_hostname = RPM_REGEX.sub(DEFAULT_RPM_ID, dut_hostname, count=1)
+ powerunit_type = powerunit_info.powerunit_type
+ powerunit_hostname = powerunit_info.powerunit_hostname
with self._lock:
- if self._rpm_dict.get(rpm_hostname):
- return self._rpm_dict[rpm_hostname]
- logging.info('No Dispatcher assigned for RPM %s.', rpm_hostname)
+ if self._rpm_dict.get(powerunit_hostname):
+ return self._rpm_dict[powerunit_hostname]
+ logging.info('No Dispatcher assigned for %s %s.',
+ powerunit_type, powerunit_hostname)
# Choose the least loaded dispatcher to communicate with the RPM.
try:
heap_entry = heapq.heappop(self._dispatcher_minheap)
@@ -213,9 +265,9 @@
# Put this entry back in the heap with an RPM Count + 1.
heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
heapq.heappush(self._dispatcher_minheap, heap_entry)
- logging.info('Assigning %s for RPM %s', dispatcher_uri,
- rpm_hostname)
- self._rpm_dict[rpm_hostname] = dispatcher_uri
+ logging.info('Assigning %s for %s %s', dispatcher_uri,
+ powerunit_type, powerunit_hostname)
+ self._rpm_dict[powerunit_hostname] = dispatcher_uri
return dispatcher_uri