blob: e4de635fcf6c22ff9073b597833c24b367947681 [file] [log] [blame]
mbligh999fb132010-04-23 17:22:03 +00001import sys, socket, errno, logging
2from time import time, sleep
3from autotest_lib.client.common_lib import error
4
5# default barrier port
6_DEFAULT_PORT = 11922
7
8
9class BarrierAbortError(error.BarrierError):
10 """Special BarrierError raised when an explicit abort is requested."""
11
12
13class listen_server(object):
14 """
15 Manages a listening socket for barrier.
16
17 Can be used to run multiple barrier instances with the same listening
18 socket (if they were going to listen on the same port).
19
20 Attributes:
21
22 @attr address: Address to bind to (string).
23 @attr port: Port to bind to.
24 @attr socket: Listening socket object.
25 """
26 def __init__(self, address='', port=_DEFAULT_PORT):
27 """
28 Create a listen_server instance for the given address/port.
29
30 @param address: The address to listen on.
31 @param port: The port to listen on.
32 """
33 self.address = address
34 self.port = port
35 self.socket = self._setup()
36
37
38 def _setup(self):
39 """Create, bind and listen on the listening socket."""
40 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
41 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
42 sock.bind((self.address, self.port))
43 sock.listen(10)
44
45 return sock
46
47
48 def close(self):
49 """Close the listening socket."""
50 self.socket.close()
51
52
53class barrier(object):
54 """Multi-machine barrier support.
55
56 Provides multi-machine barrier mechanism.
57 Execution stops until all members arrive at the barrier.
58
59 Implementation Details:
60 .......................
61
62 When a barrier is forming the master node (first in sort order) in the
63 set accepts connections from each member of the set. As they arrive
64 they indicate the barrier they are joining and their identifier (their
65 hostname or IP address and optional tag). They are then asked to wait.
66 When all members are present the master node then checks that each
67 member is still responding via a ping/pong exchange. If this is
68 successful then everyone has checked in at the barrier. We then tell
69 everyone they may continue via a rlse message.
70
71 Where the master is not the first to reach the barrier the client
72 connects will fail. Client will retry until they either succeed in
73 connecting to master or the overall timeout is exceeded.
74
75 As an example here is the exchange for a three node barrier called
76 'TAG'
77
78 MASTER CLIENT1 CLIENT2
79 <-------------TAG C1-------------
80 --------------wait-------------->
81 [...]
82 <-------------TAG C2-----------------------------
83 --------------wait------------------------------>
84 [...]
85 --------------ping-------------->
86 <-------------pong---------------
87 --------------ping------------------------------>
88 <-------------pong-------------------------------
89 ----- BARRIER conditions MET -----
90 --------------rlse-------------->
91 --------------rlse------------------------------>
92
93 Note that once the last client has responded to pong the barrier is
94 implicitly deemed satisifed, they have all acknowledged their presence.
95 If we fail to send any of the rlse messages the barrier is still a
96 success, the failed host has effectively broken 'right at the beginning'
97 of the post barrier execution window.
98
99 In addition, there is another rendezvous, that makes each slave a server
100 and the master a client. The connection process and usage is still the
101 same but allows barriers from machines that only have a one-way
102 connection initiation. This is called rendezvous_servers.
103
104 For example:
105 if ME == SERVER:
106 server start
107
108 b = job.barrier(ME, 'server-up', 120)
109 b.rendezvous(CLIENT, SERVER)
110
111 if ME == CLIENT:
112 client run
113
114 b = job.barrier(ME, 'test-complete', 3600)
115 b.rendezvous(CLIENT, SERVER)
116
117 if ME == SERVER:
118 server stop
119
120 Any client can also request an abort of the job by setting
121 abort=True in the rendezvous arguments.
122 """
123
124 def __init__(self, hostid, tag, timeout=None, port=None,
125 listen_server=None):
126 """
127 @param hostid: My hostname/IP address + optional tag.
128 @param tag: Symbolic name of the barrier in progress.
129 @param timeout: Maximum seconds to wait for a the barrier to meet.
130 @param port: Port number to listen on.
131 @param listen_server: External listen_server instance to use instead
132 of creating our own. Create a listen_server instance and
133 reuse it across multiple barrier instances so that the
134 barrier code doesn't try to quickly re-bind on the same port
135 (packets still in transit for the previous barrier they may
136 reset new connections).
137 """
138 self._hostid = hostid
139 self._tag = tag
140 if listen_server:
141 if port:
142 raise error.BarrierError(
143 '"port" and "listen_server" are mutually exclusive.')
144 self._port = listen_server.port
145 else:
146 self._port = port or _DEFAULT_PORT
147 self._server = listen_server # A listen_server instance or None.
148 self._members = [] # List of hosts we expect to find at the barrier.
149 self._timeout_secs = timeout
150 self._start_time = None # Timestamp of when we started waiting.
151 self._masterid = None # Host/IP + optional tag of selected master.
152 logging.info("tag=%s port=%d timeout=%r",
153 self._tag, self._port, self._timeout_secs)
154
155 # Number of clients seen (should be the length of self._waiting).
156 self._seen = 0
157
158 # Clients who have checked in and are waiting (if we are a master).
159 self._waiting = {} # Maps from hostname -> (client, addr) tuples.
160
161
162 def _get_host_from_id(self, hostid):
163 # Remove any trailing local identifier following a #.
164 # This allows multiple members per host which is particularly
165 # helpful in testing.
166 if not hostid.startswith('#'):
167 return hostid.split('#')[0]
168 else:
169 raise error.BarrierError(
170 "Invalid Host id: Host Address should be specified")
171
172
173 def _update_timeout(self, timeout):
174 if timeout is not None and self._start_time is not None:
175 self._timeout_secs = (time() - self._start_time) + timeout
176 else:
177 self._timeout_secs = timeout
178
179
180 def _remaining(self):
181 if self._timeout_secs is not None and self._start_time is not None:
182 timeout = self._timeout_secs - (time() - self._start_time)
183 if timeout <= 0:
184 errmsg = "timeout waiting for barrier: %s" % self._tag
185 logging.error(error)
186 raise error.BarrierError(errmsg)
187 else:
188 timeout = self._timeout_secs
189
190 if self._timeout_secs is not None:
191 logging.info("seconds remaining: %d", timeout)
192 return timeout
193
194
195 def _master_welcome(self, connection):
196 client, addr = connection
197 name = None
198
199 client.settimeout(5)
200 try:
201 # Get the clients name.
202 intro = client.recv(1024)
203 intro = intro.strip("\r\n")
204
205 intro_parts = intro.split(' ', 2)
206 if len(intro_parts) != 2:
207 logging.warn("Ignoring invalid data from %s: %r",
208 client.getpeername(), intro)
209 client.close()
210 return
211 tag, name = intro_parts
212
213 logging.info("new client tag=%s, name=%s", tag, name)
214
215 # Ok, we know who is trying to attach. Confirm that
216 # they are coming to the same meeting. Also, everyone
217 # should be using a unique handle (their IP address).
218 # If we see a duplicate, something _bad_ has happened
219 # so drop them now.
220 if self._tag != tag:
221 logging.warn("client arriving for the wrong barrier: %s != %s",
222 self._tag, tag)
223 client.settimeout(5)
224 client.send("!tag")
225 client.close()
226 return
227 elif name in self._waiting:
228 logging.warn("duplicate client")
229 client.settimeout(5)
230 client.send("!dup")
231 client.close()
232 return
233
234 # Acknowledge the client
235 client.send("wait")
236
237 except socket.timeout:
238 # This is nominally an error, but as we do not know
239 # who that was we cannot do anything sane other
240 # than report it and let the normal timeout kill
241 # us when thats appropriate.
242 logging.warn("client handshake timeout: (%s:%d)",
243 addr[0], addr[1])
244 client.close()
245 return
246
247 logging.info("client now waiting: %s (%s:%d)",
248 name, addr[0], addr[1])
249
250 # They seem to be valid record them.
251 self._waiting[name] = connection
252 self._seen += 1
253
254
255 def _slave_hello(self, connection):
256 (client, addr) = connection
257 name = None
258
259 client.settimeout(5)
260 try:
261 client.send(self._tag + " " + self._hostid)
262
263 reply = client.recv(4)
264 reply = reply.strip("\r\n")
265 logging.info("master said: %s", reply)
266
267 # Confirm the master accepted the connection.
268 if reply != "wait":
269 logging.warn("Bad connection request to master")
270 client.close()
271 return
272
273 except socket.timeout:
274 # This is nominally an error, but as we do not know
275 # who that was we cannot do anything sane other
276 # than report it and let the normal timeout kill
277 # us when thats appropriate.
278 logging.error("master handshake timeout: (%s:%d)",
279 addr[0], addr[1])
280 client.close()
281 return
282
283 logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
284
285 # They seem to be valid record them.
286 self._waiting[self._hostid] = connection
287 self._seen = 1
288
289
290 def _master_release(self):
291 # Check everyone is still there, that they have not
292 # crashed or disconnected in the meantime.
293 allpresent = True
294 abort = self._abort
295 for name in self._waiting:
296 (client, addr) = self._waiting[name]
297
298 logging.info("checking client present: %s", name)
299
300 client.settimeout(5)
301 reply = 'none'
302 try:
303 client.send("ping")
304 reply = client.recv(1024)
305 except socket.timeout:
306 logging.warn("ping/pong timeout: %s", name)
307 pass
308
309 if reply == 'abrt':
310 logging.warn("Client %s requested abort", name)
311 abort = True
312 elif reply != "pong":
313 allpresent = False
314
315 if not allpresent:
316 raise error.BarrierError("master lost client")
317
318 if abort:
319 logging.info("Aborting the clients")
320 msg = 'abrt'
321 else:
322 logging.info("Releasing clients")
323 msg = 'rlse'
324
325 # If every ones checks in then commit the release.
326 for name in self._waiting:
327 (client, addr) = self._waiting[name]
328
329 client.settimeout(5)
330 try:
331 client.send(msg)
332 except socket.timeout:
333 logging.warn("release timeout: %s", name)
334 pass
335
336 if abort:
337 raise BarrierAbortError("Client requested abort")
338
339
340 def _waiting_close(self):
341 # Either way, close out all the clients. If we have
342 # not released them then they know to abort.
343 for name in self._waiting:
344 (client, addr) = self._waiting[name]
345
346 logging.info("closing client: %s", name)
347
348 try:
349 client.close()
350 except:
351 pass
352
353
354 def _run_server(self, is_master):
355 server = self._server or listen_server(port=self._port)
356 failed = 0
357 try:
358 while True:
359 try:
360 # Wait for callers welcoming each.
361 server.socket.settimeout(self._remaining())
362 connection = server.socket.accept()
363 if is_master:
364 self._master_welcome(connection)
365 else:
366 self._slave_hello(connection)
367 except socket.timeout:
368 logging.warn("timeout waiting for remaining clients")
369 pass
370
371 if is_master:
372 # Check if everyone is here.
373 logging.info("master seen %d of %d",
374 self._seen, len(self._members))
375 if self._seen == len(self._members):
376 self._master_release()
377 break
378 else:
379 # Check if master connected.
380 if self._seen:
381 logging.info("slave connected to master")
382 self._slave_wait()
383 break
384 finally:
385 self._waiting_close()
386 # if we created the listening_server in the beginning of this
387 # function then close the listening socket here
388 if not self._server:
389 server.close()
390
391
392 def _run_client(self, is_master):
393 while self._remaining() is None or self._remaining() > 0:
394 try:
395 remote = socket.socket(socket.AF_INET,
396 socket.SOCK_STREAM)
397 remote.settimeout(30)
398 if is_master:
399 # Connect to all slaves.
400 host = self._get_host_from_id(self._members[self._seen])
401 logging.info("calling slave: %s", host)
402 connection = (remote, (host, self._port))
403 remote.connect(connection[1])
404 self._master_welcome(connection)
405 else:
406 # Just connect to the master.
407 host = self._get_host_from_id(self._masterid)
408 logging.info("calling master")
409 connection = (remote, (host, self._port))
410 remote.connect(connection[1])
411 self._slave_hello(connection)
412 except socket.timeout:
413 logging.warn("timeout calling host, retry")
414 sleep(10)
415 pass
416 except socket.error, err:
417 (code, str) = err
418 if (code != errno.ECONNREFUSED):
419 raise
420 sleep(10)
421
422 if is_master:
423 # Check if everyone is here.
424 logging.info("master seen %d of %d",
425 self._seen, len(self._members))
426 if self._seen == len(self._members):
427 self._master_release()
428 break
429 else:
430 # Check if master connected.
431 if self._seen:
432 logging.info("slave connected to master")
433 self._slave_wait()
434 break
435
436 self._waiting_close()
437
438
439 def _slave_wait(self):
440 remote = self._waiting[self._hostid][0]
441 mode = "wait"
442 while True:
443 # All control messages are the same size to allow
444 # us to split individual messages easily.
445 remote.settimeout(self._remaining())
446 reply = remote.recv(4)
447 if not reply:
448 break
449
450 reply = reply.strip("\r\n")
451 logging.info("master said: %s", reply)
452
453 mode = reply
454 if reply == "ping":
455 # Ensure we have sufficient time for the
456 # ping/pong/rlse cyle to complete normally.
457 self._update_timeout(10 + 10 * len(self._members))
458
459 if self._abort:
460 msg = "abrt"
461 else:
462 msg = "pong"
463 logging.info(msg)
464 remote.settimeout(self._remaining())
465 remote.send(msg)
466
467 elif reply == "rlse" or reply == "abrt":
468 # Ensure we have sufficient time for the
469 # ping/pong/rlse cyle to complete normally.
470 self._update_timeout(10 + 10 * len(self._members))
471
472 logging.info("was released, waiting for close")
473
474 if mode == "rlse":
475 pass
476 elif mode == "wait":
477 raise error.BarrierError("master abort -- barrier timeout")
478 elif mode == "ping":
479 raise error.BarrierError("master abort -- client lost")
480 elif mode == "!tag":
481 raise error.BarrierError("master abort -- incorrect tag")
482 elif mode == "!dup":
483 raise error.BarrierError("master abort -- duplicate client")
484 elif mode == "abrt":
485 raise BarrierAbortError("Client requested abort")
486 else:
487 raise error.BarrierError("master handshake failure: " + mode)
488
489
490 def rendezvous(self, *hosts, **dargs):
491 # if called with abort=True, this will raise an exception
492 # on all the clients.
493 self._start_time = time()
494 self._members = list(hosts)
495 self._members.sort()
496 self._masterid = self._members.pop(0)
497 self._abort = dargs.get('abort', False)
498
499 logging.info("masterid: %s", self._masterid)
500 if self._abort:
501 logging.debug("%s is aborting", self._hostid)
502 if not len(self._members):
503 logging.info("No other members listed.")
504 return
505 logging.info("members: %s", ",".join(self._members))
506
507 self._seen = 0
508 self._waiting = {}
509
510 # Figure out who is the master in this barrier.
511 if self._hostid == self._masterid:
512 logging.info("selected as master")
513 self._run_server(is_master=True)
514 else:
515 logging.info("selected as slave")
516 self._run_client(is_master=False)
517
518
519 def rendezvous_servers(self, masterid, *hosts, **dargs):
520 # if called with abort=True, this will raise an exception
521 # on all the clients.
522 self._start_time = time()
523 self._members = list(hosts)
524 self._members.sort()
525 self._masterid = masterid
526 self._abort = dargs.get('abort', False)
527
528 logging.info("masterid: %s", self._masterid)
529 if not len(self._members):
530 logging.info("No other members listed.")
531 return
532 logging.info("members: %s", ",".join(self._members))
533
534 self._seen = 0
535 self._waiting = {}
536
537 # Figure out who is the master in this barrier.
538 if self._hostid == self._masterid:
539 logging.info("selected as master")
540 self._run_client(is_master=True)
541 else:
542 logging.info("selected as slave")
543 self._run_server(is_master=False)