blob: 9a2ea97f915767f01d648d377e85c3833192eedd [file] [log] [blame]
Allen Li38c99602017-02-03 17:07:33 -08001import sys, socket, errno, logging
2from time import time, sleep
3from autotest_lib.client.common_lib import error, utils
mblighfadca202006-09-23 04:40:01 +00004
Allen Li38c99602017-02-03 17:07:33 -08005# default barrier port
6_DEFAULT_PORT = 11922
7
8def _get_host_from_id(hostid):
9 # Remove any trailing local identifier following a #.
10 # This allows multiple members per host which is particularly
11 # helpful in testing.
12 if not hostid.startswith('#'):
13 return hostid.split('#')[0]
14 else:
15 raise error.BarrierError(
16 "Invalid Host id: Host Address should be specified")
17
18
19class BarrierAbortError(error.BarrierError):
20 """Special BarrierError raised when an explicit abort is requested."""
21
22
23class listen_server(object):
24 """
25 Manages a listening socket for barrier.
26
27 Can be used to run multiple barrier instances with the same listening
28 socket (if they were going to listen on the same port).
29
30 Attributes:
31
32 @attr address: Address to bind to (string).
33 @attr port: Port to bind to.
34 @attr socket: Listening socket object.
35 """
36 def __init__(self, address='', port=_DEFAULT_PORT):
37 """
38 Create a listen_server instance for the given address/port.
39
40 @param address: The address to listen on.
41 @param port: The port to listen on.
42 """
43 self.address = address
44 self.port = port
45 # Open the port so that the listening server can accept incoming
46 # connections.
47 utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' %
48 port)
49 self.socket = self._setup()
50
51
52 def _setup(self):
53 """Create, bind and listen on the listening socket."""
54 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
55 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
56 sock.bind((self.address, self.port))
57 sock.listen(10)
58
59 return sock
60
61
62 def close(self):
63 """Close the listening socket."""
64 self.socket.close()
65
66
67class barrier(object):
68 """Multi-machine barrier support.
69
70 Provides multi-machine barrier mechanism.
71 Execution stops until all members arrive at the barrier.
72
73 Implementation Details:
74 .......................
75
76 When a barrier is forming the master node (first in sort order) in the
77 set accepts connections from each member of the set. As they arrive
78 they indicate the barrier they are joining and their identifier (their
79 hostname or IP address and optional tag). They are then asked to wait.
80 When all members are present the master node then checks that each
81 member is still responding via a ping/pong exchange. If this is
82 successful then everyone has checked in at the barrier. We then tell
83 everyone they may continue via a rlse message.
84
85 Where the master is not the first to reach the barrier the client
86 connects will fail. Client will retry until they either succeed in
87 connecting to master or the overall timeout is exceeded.
88
89 As an example here is the exchange for a three node barrier called
90 'TAG'
91
92 MASTER CLIENT1 CLIENT2
93 <-------------TAG C1-------------
94 --------------wait-------------->
95 [...]
96 <-------------TAG C2-----------------------------
97 --------------wait------------------------------>
98 [...]
99 --------------ping-------------->
100 <-------------pong---------------
101 --------------ping------------------------------>
102 <-------------pong-------------------------------
103 ----- BARRIER conditions MET -----
104 --------------rlse-------------->
105 --------------rlse------------------------------>
106
107 Note that once the last client has responded to pong the barrier is
108 implicitly deemed satisifed, they have all acknowledged their presence.
109 If we fail to send any of the rlse messages the barrier is still a
110 success, the failed host has effectively broken 'right at the beginning'
111 of the post barrier execution window.
112
113 In addition, there is another rendezvous, that makes each slave a server
114 and the master a client. The connection process and usage is still the
115 same but allows barriers from machines that only have a one-way
116 connection initiation. This is called rendezvous_servers.
117
118 For example:
119 if ME == SERVER:
120 server start
121
122 b = job.barrier(ME, 'server-up', 120)
123 b.rendezvous(CLIENT, SERVER)
124
125 if ME == CLIENT:
126 client run
127
128 b = job.barrier(ME, 'test-complete', 3600)
129 b.rendezvous(CLIENT, SERVER)
130
131 if ME == SERVER:
132 server stop
133
134 Any client can also request an abort of the job by setting
135 abort=True in the rendezvous arguments.
136 """
137
138 def __init__(self, hostid, tag, timeout=None, port=None,
139 listen_server=None):
140 """
141 @param hostid: My hostname/IP address + optional tag.
142 @param tag: Symbolic name of the barrier in progress.
143 @param timeout: Maximum seconds to wait for a the barrier to meet.
144 @param port: Port number to listen on.
145 @param listen_server: External listen_server instance to use instead
146 of creating our own. Create a listen_server instance and
147 reuse it across multiple barrier instances so that the
148 barrier code doesn't try to quickly re-bind on the same port
149 (packets still in transit for the previous barrier they may
150 reset new connections).
151 """
152 self._hostid = hostid
153 self._tag = tag
154 if listen_server:
155 if port:
156 raise error.BarrierError(
157 '"port" and "listen_server" are mutually exclusive.')
158 self._port = listen_server.port
159 else:
160 self._port = port or _DEFAULT_PORT
161 self._server = listen_server # A listen_server instance or None.
162 self._members = [] # List of hosts we expect to find at the barrier.
163 self._timeout_secs = timeout
164 self._start_time = None # Timestamp of when we started waiting.
165 self._masterid = None # Host/IP + optional tag of selected master.
166 logging.info("tag=%s port=%d timeout=%r",
167 self._tag, self._port, self._timeout_secs)
168
169 # Number of clients seen (should be the length of self._waiting).
170 self._seen = 0
171
172 # Clients who have checked in and are waiting (if we are a master).
173 self._waiting = {} # Maps from hostname -> (client, addr) tuples.
174
175
176 def _update_timeout(self, timeout):
177 if timeout is not None and self._start_time is not None:
178 self._timeout_secs = (time() - self._start_time) + timeout
179 else:
180 self._timeout_secs = timeout
181
182
183 def _remaining(self):
184 if self._timeout_secs is not None and self._start_time is not None:
185 timeout = self._timeout_secs - (time() - self._start_time)
186 if timeout <= 0:
187 errmsg = "timeout waiting for barrier: %s" % self._tag
188 logging.error(error)
189 raise error.BarrierError(errmsg)
190 else:
191 timeout = self._timeout_secs
192
193 if self._timeout_secs is not None:
194 logging.info("seconds remaining: %d", timeout)
195 return timeout
196
197
198 def _master_welcome(self, connection):
199 client, addr = connection
200 name = None
201
202 client.settimeout(5)
203 try:
204 # Get the clients name.
205 intro = client.recv(1024)
206 intro = intro.strip("\r\n")
207
208 intro_parts = intro.split(' ', 2)
209 if len(intro_parts) != 2:
210 logging.warning("Ignoring invalid data from %s: %r",
211 client.getpeername(), intro)
212 client.close()
213 return
214 tag, name = intro_parts
215
216 logging.info("new client tag=%s, name=%s", tag, name)
217
218 # Ok, we know who is trying to attach. Confirm that
219 # they are coming to the same meeting. Also, everyone
220 # should be using a unique handle (their IP address).
221 # If we see a duplicate, something _bad_ has happened
222 # so drop them now.
223 if self._tag != tag:
224 logging.warning("client arriving for the wrong barrier: %s != %s",
225 self._tag, tag)
226 client.settimeout(5)
227 client.send("!tag")
228 client.close()
229 return
230 elif name in self._waiting:
231 logging.warning("duplicate client")
232 client.settimeout(5)
233 client.send("!dup")
234 client.close()
235 return
236
237 # Acknowledge the client
238 client.send("wait")
239
240 except socket.timeout:
241 # This is nominally an error, but as we do not know
242 # who that was we cannot do anything sane other
243 # than report it and let the normal timeout kill
244 # us when thats appropriate.
245 logging.warning("client handshake timeout: (%s:%d)",
246 addr[0], addr[1])
247 client.close()
248 return
249
250 logging.info("client now waiting: %s (%s:%d)",
251 name, addr[0], addr[1])
252
253 # They seem to be valid record them.
254 self._waiting[name] = connection
255 self._seen += 1
256
257
258 def _slave_hello(self, connection):
259 (client, addr) = connection
260 name = None
261
262 client.settimeout(5)
263 try:
264 client.send(self._tag + " " + self._hostid)
265
266 reply = client.recv(4)
267 reply = reply.strip("\r\n")
268 logging.info("master said: %s", reply)
269
270 # Confirm the master accepted the connection.
271 if reply != "wait":
272 logging.warning("Bad connection request to master")
273 client.close()
274 return
275
276 except socket.timeout:
277 # This is nominally an error, but as we do not know
278 # who that was we cannot do anything sane other
279 # than report it and let the normal timeout kill
280 # us when thats appropriate.
281 logging.error("master handshake timeout: (%s:%d)",
282 addr[0], addr[1])
283 client.close()
284 return
285
286 logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
287
288 # They seem to be valid record them.
289 self._waiting[self._hostid] = connection
290 self._seen = 1
291
292
293 def _master_release(self):
294 # Check everyone is still there, that they have not
295 # crashed or disconnected in the meantime.
296 allpresent = True
297 abort = self._abort
298 for name in self._waiting:
299 (client, addr) = self._waiting[name]
300
301 logging.info("checking client present: %s", name)
302
303 client.settimeout(5)
304 reply = 'none'
305 try:
306 client.send("ping")
307 reply = client.recv(1024)
308 except socket.timeout:
309 logging.warning("ping/pong timeout: %s", name)
310 pass
311
312 if reply == 'abrt':
313 logging.warning("Client %s requested abort", name)
314 abort = True
315 elif reply != "pong":
316 allpresent = False
317
318 if not allpresent:
319 raise error.BarrierError("master lost client")
320
321 if abort:
322 logging.info("Aborting the clients")
323 msg = 'abrt'
324 else:
325 logging.info("Releasing clients")
326 msg = 'rlse'
327
328 # If every ones checks in then commit the release.
329 for name in self._waiting:
330 (client, addr) = self._waiting[name]
331
332 client.settimeout(5)
333 try:
334 client.send(msg)
335 except socket.timeout:
336 logging.warning("release timeout: %s", name)
337 pass
338
339 if abort:
340 raise BarrierAbortError("Client requested abort")
341
342
343 def _waiting_close(self):
344 # Either way, close out all the clients. If we have
345 # not released them then they know to abort.
346 for name in self._waiting:
347 (client, addr) = self._waiting[name]
348
349 logging.info("closing client: %s", name)
350
351 try:
352 client.close()
353 except:
354 pass
355
356
357 def _run_server(self, is_master):
358 server = self._server or listen_server(port=self._port)
359 failed = 0
360 try:
361 while True:
362 try:
363 # Wait for callers welcoming each.
364 server.socket.settimeout(self._remaining())
365 connection = server.socket.accept()
366 if is_master:
367 self._master_welcome(connection)
368 else:
369 self._slave_hello(connection)
370 except socket.timeout:
371 logging.warning("timeout waiting for remaining clients")
372 pass
373
374 if is_master:
375 # Check if everyone is here.
376 logging.info("master seen %d of %d",
377 self._seen, len(self._members))
378 if self._seen == len(self._members):
379 self._master_release()
380 break
381 else:
382 # Check if master connected.
383 if self._seen:
384 logging.info("slave connected to master")
385 self._slave_wait()
386 break
387 finally:
388 self._waiting_close()
389 # if we created the listening_server in the beginning of this
390 # function then close the listening socket here
391 if not self._server:
392 server.close()
393
394
395 def _run_client(self, is_master):
396 while self._remaining() is None or self._remaining() > 0:
397 try:
398 remote = socket.socket(socket.AF_INET,
399 socket.SOCK_STREAM)
400 remote.settimeout(30)
401 if is_master:
402 # Connect to all slaves.
403 host = _get_host_from_id(self._members[self._seen])
404 logging.info("calling slave: %s", host)
405 connection = (remote, (host, self._port))
406 remote.connect(connection[1])
407 self._master_welcome(connection)
408 else:
409 # Just connect to the master.
410 host = _get_host_from_id(self._masterid)
411 logging.info("calling master")
412 connection = (remote, (host, self._port))
413 remote.connect(connection[1])
414 self._slave_hello(connection)
415 except socket.timeout:
416 logging.warning("timeout calling host, retry")
417 sleep(10)
418 pass
419 except socket.error, err:
420 (code, str) = err
Ting-Yuan Huang0ee498c2017-08-25 00:06:04 -0700421 if (code != errno.ECONNREFUSED and
422 code != errno.ETIMEDOUT):
Allen Li38c99602017-02-03 17:07:33 -0800423 raise
424 sleep(10)
425
426 if is_master:
427 # Check if everyone is here.
428 logging.info("master seen %d of %d",
429 self._seen, len(self._members))
430 if self._seen == len(self._members):
431 self._master_release()
432 break
433 else:
434 # Check if master connected.
435 if self._seen:
436 logging.info("slave connected to master")
437 self._slave_wait()
438 break
439
440 self._waiting_close()
441
442
443 def _slave_wait(self):
444 remote = self._waiting[self._hostid][0]
445 mode = "wait"
446 while True:
447 # All control messages are the same size to allow
448 # us to split individual messages easily.
449 remote.settimeout(self._remaining())
450 reply = remote.recv(4)
451 if not reply:
452 break
453
454 reply = reply.strip("\r\n")
455 logging.info("master said: %s", reply)
456
457 mode = reply
458 if reply == "ping":
459 # Ensure we have sufficient time for the
460 # ping/pong/rlse cyle to complete normally.
461 self._update_timeout(10 + 10 * len(self._members))
462
463 if self._abort:
464 msg = "abrt"
465 else:
466 msg = "pong"
467 logging.info(msg)
468 remote.settimeout(self._remaining())
469 remote.send(msg)
470
471 elif reply == "rlse" or reply == "abrt":
472 # Ensure we have sufficient time for the
473 # ping/pong/rlse cyle to complete normally.
474 self._update_timeout(10 + 10 * len(self._members))
475
476 logging.info("was released, waiting for close")
477
478 if mode == "rlse":
479 pass
480 elif mode == "wait":
481 raise error.BarrierError("master abort -- barrier timeout")
482 elif mode == "ping":
483 raise error.BarrierError("master abort -- client lost")
484 elif mode == "!tag":
485 raise error.BarrierError("master abort -- incorrect tag")
486 elif mode == "!dup":
487 raise error.BarrierError("master abort -- duplicate client")
488 elif mode == "abrt":
489 raise BarrierAbortError("Client requested abort")
490 else:
491 raise error.BarrierError("master handshake failure: " + mode)
492
493
494 def rendezvous(self, *hosts, **dargs):
495 # if called with abort=True, this will raise an exception
496 # on all the clients.
497 self._start_time = time()
498 self._members = list(hosts)
499 self._members.sort()
500 self._masterid = self._members.pop(0)
501 self._abort = dargs.get('abort', False)
502
503 logging.info("masterid: %s", self._masterid)
504 if self._abort:
505 logging.debug("%s is aborting", self._hostid)
506 if not len(self._members):
507 logging.info("No other members listed.")
508 return
509 logging.info("members: %s", ",".join(self._members))
510
511 self._seen = 0
512 self._waiting = {}
513
514 # Figure out who is the master in this barrier.
515 if self._hostid == self._masterid:
516 logging.info("selected as master")
517 self._run_server(is_master=True)
518 else:
519 logging.info("selected as slave")
520 self._run_client(is_master=False)
521
522
523 def rendezvous_servers(self, masterid, *hosts, **dargs):
524 # if called with abort=True, this will raise an exception
525 # on all the clients.
526 self._start_time = time()
527 self._members = list(hosts)
528 self._members.sort()
529 self._masterid = masterid
530 self._abort = dargs.get('abort', False)
531
532 logging.info("masterid: %s", self._masterid)
533 if not len(self._members):
534 logging.info("No other members listed.")
535 return
536 logging.info("members: %s", ",".join(self._members))
537
538 self._seen = 0
539 self._waiting = {}
540
541 # Figure out who is the master in this barrier.
542 if self._hostid == self._masterid:
543 logging.info("selected as master")
544 self._run_client(is_master=True)
545 else:
546 logging.info("selected as slave")
547 self._run_server(is_master=False)