blob: ef6b930cf812c9b8a95609294282da7b68142e38 [file] [log] [blame]
Scott Zawalski201d6be2012-09-21 15:56:25 -04001#!/usr/bin/python
Simran Basi7498d202012-07-10 15:21:28 -07002# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import errno
7import heapq
8import logging
Fang Deng71c4b1f2013-05-20 09:55:04 -07009import os
Simran Basi7498d202012-07-10 15:21:28 -070010import re
Scott Zawalski201d6be2012-09-21 15:56:25 -040011import sys
Simran Basi7498d202012-07-10 15:21:28 -070012import socket
13import threading
14import xmlrpclib
15
Fang Deng71c4b1f2013-05-20 09:55:04 -070016import rpm_logging_config
Simran Basi7498d202012-07-10 15:21:28 -070017from config import rpm_config
18from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
19from rpm_infrastructure_exception import RPMInfrastructureException
Simran Basi7498d202012-07-10 15:21:28 -070020
Dan Shib7610b52014-05-06 11:09:49 -070021import common
22from autotest_lib.site_utils.rpm_control_system import utils
23
Simran Basi7498d202012-07-10 15:21:28 -070024DEFAULT_RPM_COUNT = 0
25TERMINATED = -1
26
27# Indexes for accessing heap entries.
28RPM_COUNT = 0
29DISPATCHER_URI = 1
30
31LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','frontend_logname_format')
32DEFAULT_RPM_ID = rpm_config.get('RPM_INFRASTRUCTURE', 'default_rpm_id')
33
34# Valid state values.
35VALID_STATE_VALUES = ['ON', 'OFF', 'CYCLE']
36
37# RPM Hostname regex.
38RPM_REGEX = re.compile('host[^.]*')
39
Fang Deng71c4b1f2013-05-20 09:55:04 -070040# Servo-interface mapping file
Fang Deng5e4e46d2013-06-19 13:57:08 -070041MAPPING_FILE = os.path.join(
42 os.path.dirname(__file__),
43 rpm_config.get('CiscoPOE', 'servo_interface_mapping_file'))
Fang Deng71c4b1f2013-05-20 09:55:04 -070044
Simran Basi7498d202012-07-10 15:21:28 -070045
46class RPMFrontendServer(object):
47 """
48 This class is the frontend server of the RPM Infrastructure. All clients
49 will send their DUT power state requests to this central server who will
50 forward the requests to an avaliable or already assigned RPM dispatcher
51 server.
52
53 Once the dispatcher processes the request it will return the result
54 to this frontend server who will send the result back to the client.
55
56 All calls to this server are blocking.
57
58 @var _dispatcher_minheap: Min heap that returns a list of format-
59 [ num_rpm's, dispatcher_uri ]
60 Used to choose the least loaded dispatcher.
Fang Deng71c4b1f2013-05-20 09:55:04 -070061 @var _entry_dict: Maps dispatcher URI to an entry (list) inside the min
Simran Basi7498d202012-07-10 15:21:28 -070062 heap. If a dispatcher server shuts down this allows us to
63 invalidate the entry in the minheap.
64 @var _lock: Used to protect data from multiple running threads all
65 manipulating the same data.
66 @var _rpm_dict: Maps rpm hostname's to an already assigned dispatcher
67 server.
Fang Deng71c4b1f2013-05-20 09:55:04 -070068 @var _mapping_last_modified: Last-modified time of the servo-interface
69 mapping file.
70 @var _servo_interface: Maps servo hostname to (switch_hostname, interface).
Simran Basi4e3d1182013-06-25 16:12:30 -070071 @var _email_handler: Email handler to use to control email notifications.
Simran Basi7498d202012-07-10 15:21:28 -070072 """
73
74
Simran Basi4e3d1182013-06-25 16:12:30 -070075 def __init__(self, email_handler=None):
Simran Basi7498d202012-07-10 15:21:28 -070076 """
77 RPMFrontendServer constructor.
78
79 Initializes instance variables.
80 """
81 self._dispatcher_minheap = []
82 self._entry_dict = {}
83 self._lock = threading.Lock()
84 self._rpm_dict = {}
Fang Deng71c4b1f2013-05-20 09:55:04 -070085 self._mapping_last_modified = os.path.getmtime(MAPPING_FILE)
86 self._servo_interface = utils.load_servo_interface_mapping()
Simran Basi4e3d1182013-06-25 16:12:30 -070087 self._email_handler = email_handler
Simran Basi7498d202012-07-10 15:21:28 -070088
89
90 def queue_request(self, dut_hostname, new_state):
91 """
92 Forwards a request to change a DUT's power state to the appropriate
93 dispatcher server.
94
95 This call will block until the forwarded request returns.
96
97 @param dut_hostname: Hostname of the DUT whose power state we want to
98 change.
99 @param new_state: [ON, OFF, CYCLE] State to which we want to set the
100 DUT's outlet to.
101
102 @return: True if the attempt to change power state was successful,
103 False otherwise.
104
105 @raise RPMInfrastructureException: No dispatchers are available or can
106 be reached.
107 """
Simran Basid1222642012-09-24 15:31:58 -0700108 # Remove any DNS Zone information and simplify down to just the dut
109 # hostname.
110 dut_hostname = dut_hostname.split('.')[0]
Simran Basi7498d202012-07-10 15:21:28 -0700111 new_state = new_state.upper()
Simran Basid1222642012-09-24 15:31:58 -0700112 # Put new_state in all uppercase letters
Simran Basi7498d202012-07-10 15:21:28 -0700113 if new_state not in VALID_STATE_VALUES:
114 logging.error('Received request to set DUT: %s to invalid state %s',
115 dut_hostname, new_state)
116 return False
117 logging.info('Received request to set DUT: %s to state: %s',
118 dut_hostname, new_state)
119 dispatcher_uri = self._get_dispatcher(dut_hostname)
120 if not dispatcher_uri:
121 # No dispatchers available.
122 raise RPMInfrastructureException('No dispatchers available.')
123 client = xmlrpclib.ServerProxy(dispatcher_uri)
124 try:
125 # Block on the request and return the result once it arrives.
126 return client.queue_request(dut_hostname, new_state)
127 except socket.error as er:
128 # Dispatcher Server is not reachable. Unregister it and retry.
129 logging.error("Can't reach Dispatch Server: %s. Error: %s",
130 dispatcher_uri, errno.errorcode[er.errno])
131 if self.is_network_infrastructure_down():
132 # No dispatchers can handle this request so raise an Exception
133 # to the caller.
134 raise RPMInfrastructureException('No dispatchers can be'
135 'reached.')
136 logging.info('Will attempt forwarding request to other dispatch '
137 'servers.')
138 logging.error('Unregistering %s due to error. Recommend resetting '
139 'that dispatch server.', dispatcher_uri)
140 self.unregister_dispatcher(dispatcher_uri)
141 # Retry forwarding the request.
142 return self.queue_request(dut_hostname, new_state)
143
144
145 def is_network_infrastructure_down(self):
146 """
147 Check to see if we can communicate with any dispatcher servers.
148
149 Only called in the situation that queuing a request to a dispatcher
150 server failed.
151
152 @return: False if any dispatcher server is up and the rpm infrastructure
153 can still function. True otherwise.
154 """
155 for dispatcher_entry in self._dispatcher_minheap:
156 dispatcher = xmlrpclib.ServerProxy(dispatcher_entry[DISPATCHER_URI])
157 try:
158 if dispatcher.is_up():
159 # Atleast one dispatcher is alive so our network is fine.
160 return False
161 except socket.error:
162 # Can't talk to this dispatcher so keep looping.
163 pass
164 logging.error("Can't reach any dispatchers. Check frontend network "
165 'status or all dispatchers are down.')
166 return True
167
168
169 def _get_dispatcher(self, dut_hostname):
170 """
171 Private method that looks up or assigns a dispatcher server
172 responsible for communicating with the given DUT's RPM.
173
174 Will also call _check_dispatcher to make sure it is up before returning
175 it.
176
177 @param dut_hostname: Hostname of the DUT whose dispatcher URI we want
178 to retrieve.
179
180 @return: URI of dispatcher server responsible for this DUT's rpm. None
181 if no dispatcher servers are available.
182 """
Fang Deng71c4b1f2013-05-20 09:55:04 -0700183 if dut_hostname.endswith('servo'):
184 # Servos are managed by Cisco POE switches.
185 reload_info = utils.reload_servo_interface_mapping_if_necessary(
186 self._mapping_last_modified)
187 if reload_info:
188 self._mapping_last_modified, self._servo_interface = reload_info
189 switch_if_tuple = self._servo_interface.get(dut_hostname)
190 if not switch_if_tuple:
191 logging.error('Could not determine POE hostname for %s. '
192 'Please check the servo-interface mapping file.',
193 dut_hostname)
194 return None
195 else:
196 rpm_hostname = switch_if_tuple[0]
197 else:
198 # Regular DUTs are managed by RPMs.
199 rpm_hostname = RPM_REGEX.sub(DEFAULT_RPM_ID, dut_hostname, count=1)
Simran Basi7498d202012-07-10 15:21:28 -0700200 with self._lock:
201 if self._rpm_dict.get(rpm_hostname):
202 return self._rpm_dict[rpm_hostname]
203 logging.info('No Dispatcher assigned for RPM %s.', rpm_hostname)
204 # Choose the least loaded dispatcher to communicate with the RPM.
205 try:
206 heap_entry = heapq.heappop(self._dispatcher_minheap)
207 except IndexError:
208 logging.error('Infrastructure Error: Frontend has no'
209 'registered dispatchers to field out this '
210 'request!')
211 return None
212 dispatcher_uri = heap_entry[DISPATCHER_URI]
213 # Put this entry back in the heap with an RPM Count + 1.
214 heap_entry[RPM_COUNT] = heap_entry[RPM_COUNT] + 1
215 heapq.heappush(self._dispatcher_minheap, heap_entry)
216 logging.info('Assigning %s for RPM %s', dispatcher_uri,
217 rpm_hostname)
218 self._rpm_dict[rpm_hostname] = dispatcher_uri
219 return dispatcher_uri
220
221
222 def register_dispatcher(self, dispatcher_uri):
223 """
224 Called by a dispatcher server so that the frontend server knows it is
225 available to field out RPM requests.
226
227 Adds an entry to the min heap and entry map for this dispatcher.
228
229 @param dispatcher_uri: Address of dispatcher server we are registering.
230 """
231 logging.info('Registering uri: %s as a rpm dispatcher.', dispatcher_uri)
232 with self._lock:
233 heap_entry = [DEFAULT_RPM_COUNT, dispatcher_uri]
234 heapq.heappush(self._dispatcher_minheap, heap_entry)
235 self._entry_dict[dispatcher_uri] = heap_entry
236
237
238 def unregister_dispatcher(self, uri_to_unregister):
239 """
240 Called by a dispatcher server as it exits so that the frontend server
241 knows that it is no longer available to field out requests.
242
243 Assigns an rpm count of -1 to this dispatcher so that it will be pushed
244 out of the min heap.
245
246 Removes from _rpm_dict all entries with the value of this dispatcher so
247 that those RPM's can be reassigned to a new dispatcher.
248
249 @param uri_to_unregister: Address of dispatcher server we are
250 unregistering.
251 """
252 logging.info('Unregistering uri: %s as a rpm dispatcher.',
253 uri_to_unregister)
254 with self._lock:
255 heap_entry = self._entry_dict.get(uri_to_unregister)
256 if not heap_entry:
257 logging.warning('%s was not registered.', uri_to_unregister)
258 return
259 # Set this entry's RPM_COUNT to TERMINATED (-1).
260 heap_entry[RPM_COUNT] = TERMINATED
261 # Remove all RPM mappings.
262 for rpm, dispatcher in self._rpm_dict.items():
263 if dispatcher == uri_to_unregister:
264 self._rpm_dict[rpm] = None
265 self._entry_dict[uri_to_unregister] = None
266 # Re-sort the heap and remove any terminated dispatchers.
267 heapq.heapify(self._dispatcher_minheap)
268 self._remove_terminated_dispatchers()
269
270
271 def _remove_terminated_dispatchers(self):
272 """
273 Peek at the head of the heap and keep popping off values until there is
274 a non-terminated dispatcher at the top.
275 """
276 # Heapq guarantees the head of the heap is in the '0' index.
277 try:
278 # Peek at the next element in the heap.
279 top_of_heap = self._dispatcher_minheap[0]
280 while top_of_heap[RPM_COUNT] is TERMINATED:
281 # Pop off the top element.
282 heapq.heappop(self._dispatcher_minheap)
283 # Peek at the next element in the heap.
284 top_of_heap = self._dispatcher_minheap[0]
285 except IndexError:
286 # No more values in the heap. Can be thrown by both minheap[0]
287 # statements.
288 pass
289
290
Simran Basi4e3d1182013-06-25 16:12:30 -0700291 def suspend_emails(self, hours):
292 """Suspend email notifications.
293
294 @param hours: How many hours to suspend email notifications.
295 """
296 if self._email_handler:
297 self._email_handler.suspend_emails(hours)
298
299
300 def resume_emails(self):
301 """Resume email notifications."""
302 if self._email_handler:
303 self._email_handler.resume_emails()
304
305
Simran Basi7498d202012-07-10 15:21:28 -0700306if __name__ == '__main__':
307 """
308 Main function used to launch the frontend server. Creates an instance of
309 RPMFrontendServer and registers it to a MultiThreadedXMLRPCServer instance.
310 """
Scott Zawalski201d6be2012-09-21 15:56:25 -0400311 if len(sys.argv) > 1:
312 print 'Usage: ./%s, no arguments available.' % sys.argv[0]
313 sys.exit(1)
Simran Basi4e3d1182013-06-25 16:12:30 -0700314 email_handler = rpm_logging_config.set_up_logging(LOG_FILENAME_FORMAT)
315 frontend_server = RPMFrontendServer(email_handler=email_handler)
Simran Basi7498d202012-07-10 15:21:28 -0700316 address = rpm_config.get('RPM_INFRASTRUCTURE', 'frontend_addr')
317 port = rpm_config.getint('RPM_INFRASTRUCTURE', 'frontend_port')
318 server = MultiThreadedXMLRPCServer((address, port), allow_none=True)
319 server.register_instance(frontend_server)
320 logging.info('Listening on %s port %d', address, port)
321 server.serve_forever()