blob: 44e6b6274e958374ba5c39fccdfc0f96ab2ca93a [file] [log] [blame]
Mike Frysingerd03e6b52019-08-03 12:49:01 -04001#!/usr/bin/python2
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import atexit
7import errno
8import logging
9import re
Scott Zawalski201d6be2012-09-21 15:56:25 -040010import sys
Simran Basi7498d202012-07-10 15:21:28 -070011import socket
12import threading
Simran Basi7498d202012-07-10 15:21:28 -070013import xmlrpclib
14
Fang Deng71c4b1f2013-05-20 09:55:04 -070015import rpm_controller
16import rpm_logging_config
Dan Shib7610b52014-05-06 11:09:49 -070017
Simran Basi7498d202012-07-10 15:21:28 -070018from config import rpm_config
19from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
Simran Basi7498d202012-07-10 15:21:28 -070020from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070021
Dan Shib7610b52014-05-06 11:09:49 -070022import common
23from autotest_lib.site_utils.rpm_control_system import utils
24
Simran Basi7498d202012-07-10 15:21:28 -070025LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
26
27
28class RPMDispatcher(object):
29 """
30 This class is the RPM dispatcher server and it is responsible for
31 communicating directly to the RPM devices to change a DUT's outlet status.
32
33 When an RPMDispatcher is initialized it registers itself with the frontend
34 server, who will field out outlet requests to this dispatcher.
35
36 Once a request is received the dispatcher looks up the RPMController
37 instance for the given DUT and then queues up the request and blocks until
38 it is processed.
39
40 @var _address: IP address or Hostname of this dispatcher server.
41 @var _frontend_server: URI of the frontend server.
42 @var _lock: Lock used to synchronize access to _worker_dict.
43 @var _port: Port assigned to this server instance.
44 @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
45 instances.
46 """
47
48
49 def __init__(self, address, port):
50 """
51 RPMDispatcher constructor.
52
53 Initialized instance vars and registers this server with the frontend
54 server.
55
56 @param address: Address of this dispatcher server.
57 @param port: Port assigned to this dispatcher server.
58
59 @raise RPMInfrastructureException: Raised if the dispatch server is
60 unable to register with the frontend
61 server.
62 """
63 self._address = address
64 self._port = port
65 self._lock = threading.Lock()
66 self._worker_dict = {}
Prathmesh Prabhue315a962017-09-20 14:49:49 -070067 # We assume that the frontend server and dispatchers are running on the
68 # same host, and the frontend server is listening for connections from
69 # the external world.
70 frontend_server_port = rpm_config.getint('RPM_INFRASTRUCTURE',
71 'frontend_port')
72 self._frontend_server = 'http://%s:%d' % (socket.gethostname(),
73 frontend_server_port)
Simran Basi7498d202012-07-10 15:21:28 -070074 logging.info('Registering this rpm dispatcher with the frontend '
75 'server at %s.', self._frontend_server)
76 client = xmlrpclib.ServerProxy(self._frontend_server)
77 # De-register with the frontend when the dispatcher exit's.
78 atexit.register(self._unregister)
79 try:
80 client.register_dispatcher(self._get_serveruri())
81 except socket.error as er:
82 err_msg = ('Unable to register with frontend server. Error: %s.' %
83 errno.errorcode[er.errno])
84 logging.error(err_msg)
85 raise RPMInfrastructureException(err_msg)
86
87
88 def _worker_dict_put(self, key, value):
89 """
90 Private method used to synchronize access to _worker_dict.
91
92 @param key: key value we are using to access _worker_dict.
93 @param value: value we are putting into _worker_dict.
94 """
95 with self._lock:
96 self._worker_dict[key] = value
97
98
99 def _worker_dict_get(self, key):
100 """
101 Private method used to synchronize access to _worker_dict.
102
103 @param key: key value we are using to access _worker_dict.
104 @return: value found when accessing _worker_dict
105 """
106 with self._lock:
107 return self._worker_dict.get(key)
108
109
110 def is_up(self):
111 """
112 Allows the frontend server to see if the dispatcher server is up before
113 attempting to queue requests.
114
115 @return: True. If connection fails, the client proxy will throw a socket
116 error on the client side.
117 """
118 return True
119
120
Fang Dengc30c4bf2014-08-13 17:21:21 -0700121 def queue_request(self, powerunit_info_dict, new_state):
Simran Basi7498d202012-07-10 15:21:28 -0700122 """
Fang Dengc30c4bf2014-08-13 17:21:21 -0700123 Looks up the appropriate RPMController instance for the device and queues
Simran Basi7498d202012-07-10 15:21:28 -0700124 up the request.
125
Fang Dengc30c4bf2014-08-13 17:21:21 -0700126 @param powerunit_info_dict: A dictionary, containing the attribute/values
127 of an unmarshalled PowerUnitInfo instance.
Simran Basi7498d202012-07-10 15:21:28 -0700128 @param new_state: [ON, OFF, CYCLE] state we want to the change the
129 outlet to.
130 @return: True if the attempt to change power state was successful,
131 False otherwise.
132 """
Fang Dengc30c4bf2014-08-13 17:21:21 -0700133 powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict)
134 logging.info('Received request to set device: %s to state: %s',
135 powerunit_info.device_hostname, new_state)
Fang Deng17a84ab2014-09-05 16:53:55 -0700136 rpm_controller = self._get_rpm_controller(
137 powerunit_info.powerunit_hostname,
138 powerunit_info.hydra_hostname)
139 return rpm_controller.queue_request(powerunit_info, new_state)
Simran Basi7498d202012-07-10 15:21:28 -0700140
141
Fang Dengc30c4bf2014-08-13 17:21:21 -0700142 def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None):
Simran Basic2896a42013-11-05 17:31:02 -0800143 """
144 Private method that retreives the appropriate RPMController instance
145 for this RPM Hostname or calls _create_rpm_controller it if it does not
146 already exist.
147
148 @param rpm_hostname: hostname of the RPM whose RPMController we want.
149
150 @return: RPMController instance responsible for this RPM.
151 """
152 if not rpm_hostname:
153 return None
Simran Basi7498d202012-07-10 15:21:28 -0700154 rpm_controller = self._worker_dict_get(rpm_hostname)
155 if not rpm_controller:
Fang Dengc30c4bf2014-08-13 17:21:21 -0700156 rpm_controller = self._create_rpm_controller(
157 rpm_hostname, hydra_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700158 self._worker_dict_put(rpm_hostname, rpm_controller)
159 return rpm_controller
160
161
Fang Dengc30c4bf2014-08-13 17:21:21 -0700162 def _create_rpm_controller(self, rpm_hostname, hydra_hostname):
Simran Basi7498d202012-07-10 15:21:28 -0700163 """
164 Determines the type of RPMController required and initializes it.
165
166 @param rpm_hostname: Hostname of the RPM we need to communicate with.
167
168 @return: RPMController instance responsible for this RPM.
169 """
170 hostname_elements = rpm_hostname.split('-')
Fang Deng71c4b1f2013-05-20 09:55:04 -0700171 if hostname_elements[-2] == 'poe':
172 # POE switch hostname looks like 'chromeos2-poe-switch1'.
173 logging.info('The controller is a Cisco POE switch.')
Fang Dengc30c4bf2014-08-13 17:21:21 -0700174 return rpm_controller.CiscoPOEController(rpm_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700175 else:
Fang Deng71c4b1f2013-05-20 09:55:04 -0700176 # The device is an RPM.
177 rack_id = hostname_elements[-2]
178 rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
179 if rpm_typechecker.match(rack_id):
180 logging.info('RPM is a webpowered device.')
181 return rpm_controller.WebPoweredRPMController(rpm_hostname)
182 else:
183 logging.info('RPM is a Sentry CDU device.')
Fang Dengc30c4bf2014-08-13 17:21:21 -0700184 return rpm_controller.SentryRPMController(
185 hostname=rpm_hostname,
186 hydra_hostname=hydra_hostname)
Simran Basi7498d202012-07-10 15:21:28 -0700187
188
189 def _get_serveruri(self):
190 """
191 Formats the _address and _port into a meaningful URI string.
192
193 @return: URI of this dispatch server.
194 """
195 return 'http://%s:%d' % (self._address, self._port)
196
197
198 def _unregister(self):
199 """
200 Tells the frontend server that this dispatch server is shutting down and
201 to unregister it.
202
203 Called by atexit.
204
205 @raise RPMInfrastructureException: Raised if the dispatch server is
206 unable to unregister with the
207 frontend server.
208 """
209 logging.info('Dispatch server shutting down. Unregistering with RPM '
210 'frontend server.')
211 client = xmlrpclib.ServerProxy(self._frontend_server)
212 try:
213 client.unregister_dispatcher(self._get_serveruri())
214 except socket.error as er:
215 err_msg = ('Unable to unregister with frontend server. Error: %s.' %
216 errno.errorcode[er.errno])
217 logging.error(err_msg)
218 raise RPMInfrastructureException(err_msg)
219
220
221def launch_server_on_unused_port():
222 """
223 Looks up an unused port on this host and launches the xmlrpc server.
224
225 Useful for testing by running multiple dispatch servers on the same host.
226
227 @return: server,port - server object and the port that which it is listening
228 to.
229 """
230 address = socket.gethostbyname(socket.gethostname())
231 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
232 # Set this socket to allow reuse.
233 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
234 sock.bind(('', 0))
235 port = sock.getsockname()[1]
236 server = MultiThreadedXMLRPCServer((address, port),
237 allow_none=True)
238 sock.close()
239 return server, port
240
241
242if __name__ == '__main__':
243 """
244 Main function used to launch the dispatch server. Creates an instance of
245 RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
246 """
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700247 if len(sys.argv) != 2:
248 print 'Usage: ./%s <log_file_name>' % sys.argv[0]
Scott Zawalski201d6be2012-09-21 15:56:25 -0400249 sys.exit(1)
Prathmesh Prabhu90b0c952017-09-15 16:11:12 -0700250
251 rpm_logging_config.start_log_server(sys.argv[1], LOG_FILENAME_FORMAT)
252 rpm_logging_config.set_up_logging_to_server()
Dan Shi1a34c362014-04-11 16:37:04 -0700253
Simran Basi7498d202012-07-10 15:21:28 -0700254 # Get the local ip _address and set the server to utilize it.
255 address = socket.gethostbyname(socket.gethostname())
256 server, port = launch_server_on_unused_port()
257 rpm_dispatcher = RPMDispatcher(address, port)
258 server.register_instance(rpm_dispatcher)
Scott Zawalski201d6be2012-09-21 15:56:25 -0400259 server.serve_forever()