Allen Li | 38c9960 | 2017-02-03 17:07:33 -0800 | [diff] [blame] | 1 | import sys, socket, errno, logging |
| 2 | from time import time, sleep |
| 3 | from autotest_lib.client.common_lib import error, utils |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 4 | |
Allen Li | 38c9960 | 2017-02-03 17:07:33 -0800 | [diff] [blame] | 5 | # default barrier port |
| 6 | _DEFAULT_PORT = 11922 |
| 7 | |
| 8 | def _get_host_from_id(hostid): |
| 9 | # Remove any trailing local identifier following a #. |
| 10 | # This allows multiple members per host which is particularly |
| 11 | # helpful in testing. |
| 12 | if not hostid.startswith('#'): |
| 13 | return hostid.split('#')[0] |
| 14 | else: |
| 15 | raise error.BarrierError( |
| 16 | "Invalid Host id: Host Address should be specified") |
| 17 | |
| 18 | |
| 19 | class BarrierAbortError(error.BarrierError): |
| 20 | """Special BarrierError raised when an explicit abort is requested.""" |
| 21 | |
| 22 | |
| 23 | class listen_server(object): |
| 24 | """ |
| 25 | Manages a listening socket for barrier. |
| 26 | |
| 27 | Can be used to run multiple barrier instances with the same listening |
| 28 | socket (if they were going to listen on the same port). |
| 29 | |
| 30 | Attributes: |
| 31 | |
| 32 | @attr address: Address to bind to (string). |
| 33 | @attr port: Port to bind to. |
| 34 | @attr socket: Listening socket object. |
| 35 | """ |
| 36 | def __init__(self, address='', port=_DEFAULT_PORT): |
| 37 | """ |
| 38 | Create a listen_server instance for the given address/port. |
| 39 | |
| 40 | @param address: The address to listen on. |
| 41 | @param port: The port to listen on. |
| 42 | """ |
| 43 | self.address = address |
| 44 | self.port = port |
| 45 | # Open the port so that the listening server can accept incoming |
| 46 | # connections. |
| 47 | utils.run('iptables -A INPUT -p tcp -m tcp --dport %d -j ACCEPT' % |
| 48 | port) |
| 49 | self.socket = self._setup() |
| 50 | |
| 51 | |
| 52 | def _setup(self): |
| 53 | """Create, bind and listen on the listening socket.""" |
| 54 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 55 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 56 | sock.bind((self.address, self.port)) |
| 57 | sock.listen(10) |
| 58 | |
| 59 | return sock |
| 60 | |
| 61 | |
| 62 | def close(self): |
| 63 | """Close the listening socket.""" |
| 64 | self.socket.close() |
| 65 | |
| 66 | |
| 67 | class barrier(object): |
| 68 | """Multi-machine barrier support. |
| 69 | |
| 70 | Provides multi-machine barrier mechanism. |
| 71 | Execution stops until all members arrive at the barrier. |
| 72 | |
| 73 | Implementation Details: |
| 74 | ....................... |
| 75 | |
| 76 | When a barrier is forming the master node (first in sort order) in the |
| 77 | set accepts connections from each member of the set. As they arrive |
| 78 | they indicate the barrier they are joining and their identifier (their |
| 79 | hostname or IP address and optional tag). They are then asked to wait. |
| 80 | When all members are present the master node then checks that each |
| 81 | member is still responding via a ping/pong exchange. If this is |
| 82 | successful then everyone has checked in at the barrier. We then tell |
| 83 | everyone they may continue via a rlse message. |
| 84 | |
| 85 | Where the master is not the first to reach the barrier the client |
| 86 | connects will fail. Client will retry until they either succeed in |
| 87 | connecting to master or the overall timeout is exceeded. |
| 88 | |
| 89 | As an example here is the exchange for a three node barrier called |
| 90 | 'TAG' |
| 91 | |
| 92 | MASTER CLIENT1 CLIENT2 |
| 93 | <-------------TAG C1------------- |
| 94 | --------------wait--------------> |
| 95 | [...] |
| 96 | <-------------TAG C2----------------------------- |
| 97 | --------------wait------------------------------> |
| 98 | [...] |
| 99 | --------------ping--------------> |
| 100 | <-------------pong--------------- |
| 101 | --------------ping------------------------------> |
| 102 | <-------------pong------------------------------- |
| 103 | ----- BARRIER conditions MET ----- |
| 104 | --------------rlse--------------> |
| 105 | --------------rlse------------------------------> |
| 106 | |
| 107 | Note that once the last client has responded to pong the barrier is |
| 108 | implicitly deemed satisifed, they have all acknowledged their presence. |
| 109 | If we fail to send any of the rlse messages the barrier is still a |
| 110 | success, the failed host has effectively broken 'right at the beginning' |
| 111 | of the post barrier execution window. |
| 112 | |
| 113 | In addition, there is another rendezvous, that makes each slave a server |
| 114 | and the master a client. The connection process and usage is still the |
| 115 | same but allows barriers from machines that only have a one-way |
| 116 | connection initiation. This is called rendezvous_servers. |
| 117 | |
| 118 | For example: |
| 119 | if ME == SERVER: |
| 120 | server start |
| 121 | |
| 122 | b = job.barrier(ME, 'server-up', 120) |
| 123 | b.rendezvous(CLIENT, SERVER) |
| 124 | |
| 125 | if ME == CLIENT: |
| 126 | client run |
| 127 | |
| 128 | b = job.barrier(ME, 'test-complete', 3600) |
| 129 | b.rendezvous(CLIENT, SERVER) |
| 130 | |
| 131 | if ME == SERVER: |
| 132 | server stop |
| 133 | |
| 134 | Any client can also request an abort of the job by setting |
| 135 | abort=True in the rendezvous arguments. |
| 136 | """ |
| 137 | |
| 138 | def __init__(self, hostid, tag, timeout=None, port=None, |
| 139 | listen_server=None): |
| 140 | """ |
| 141 | @param hostid: My hostname/IP address + optional tag. |
| 142 | @param tag: Symbolic name of the barrier in progress. |
| 143 | @param timeout: Maximum seconds to wait for a the barrier to meet. |
| 144 | @param port: Port number to listen on. |
| 145 | @param listen_server: External listen_server instance to use instead |
| 146 | of creating our own. Create a listen_server instance and |
| 147 | reuse it across multiple barrier instances so that the |
| 148 | barrier code doesn't try to quickly re-bind on the same port |
| 149 | (packets still in transit for the previous barrier they may |
| 150 | reset new connections). |
| 151 | """ |
| 152 | self._hostid = hostid |
| 153 | self._tag = tag |
| 154 | if listen_server: |
| 155 | if port: |
| 156 | raise error.BarrierError( |
| 157 | '"port" and "listen_server" are mutually exclusive.') |
| 158 | self._port = listen_server.port |
| 159 | else: |
| 160 | self._port = port or _DEFAULT_PORT |
| 161 | self._server = listen_server # A listen_server instance or None. |
| 162 | self._members = [] # List of hosts we expect to find at the barrier. |
| 163 | self._timeout_secs = timeout |
| 164 | self._start_time = None # Timestamp of when we started waiting. |
| 165 | self._masterid = None # Host/IP + optional tag of selected master. |
| 166 | logging.info("tag=%s port=%d timeout=%r", |
| 167 | self._tag, self._port, self._timeout_secs) |
| 168 | |
| 169 | # Number of clients seen (should be the length of self._waiting). |
| 170 | self._seen = 0 |
| 171 | |
| 172 | # Clients who have checked in and are waiting (if we are a master). |
| 173 | self._waiting = {} # Maps from hostname -> (client, addr) tuples. |
| 174 | |
| 175 | |
| 176 | def _update_timeout(self, timeout): |
| 177 | if timeout is not None and self._start_time is not None: |
| 178 | self._timeout_secs = (time() - self._start_time) + timeout |
| 179 | else: |
| 180 | self._timeout_secs = timeout |
| 181 | |
| 182 | |
| 183 | def _remaining(self): |
| 184 | if self._timeout_secs is not None and self._start_time is not None: |
| 185 | timeout = self._timeout_secs - (time() - self._start_time) |
| 186 | if timeout <= 0: |
| 187 | errmsg = "timeout waiting for barrier: %s" % self._tag |
| 188 | logging.error(error) |
| 189 | raise error.BarrierError(errmsg) |
| 190 | else: |
| 191 | timeout = self._timeout_secs |
| 192 | |
| 193 | if self._timeout_secs is not None: |
| 194 | logging.info("seconds remaining: %d", timeout) |
| 195 | return timeout |
| 196 | |
| 197 | |
| 198 | def _master_welcome(self, connection): |
| 199 | client, addr = connection |
| 200 | name = None |
| 201 | |
| 202 | client.settimeout(5) |
| 203 | try: |
| 204 | # Get the clients name. |
| 205 | intro = client.recv(1024) |
| 206 | intro = intro.strip("\r\n") |
| 207 | |
| 208 | intro_parts = intro.split(' ', 2) |
| 209 | if len(intro_parts) != 2: |
| 210 | logging.warning("Ignoring invalid data from %s: %r", |
| 211 | client.getpeername(), intro) |
| 212 | client.close() |
| 213 | return |
| 214 | tag, name = intro_parts |
| 215 | |
| 216 | logging.info("new client tag=%s, name=%s", tag, name) |
| 217 | |
| 218 | # Ok, we know who is trying to attach. Confirm that |
| 219 | # they are coming to the same meeting. Also, everyone |
| 220 | # should be using a unique handle (their IP address). |
| 221 | # If we see a duplicate, something _bad_ has happened |
| 222 | # so drop them now. |
| 223 | if self._tag != tag: |
| 224 | logging.warning("client arriving for the wrong barrier: %s != %s", |
| 225 | self._tag, tag) |
| 226 | client.settimeout(5) |
| 227 | client.send("!tag") |
| 228 | client.close() |
| 229 | return |
| 230 | elif name in self._waiting: |
| 231 | logging.warning("duplicate client") |
| 232 | client.settimeout(5) |
| 233 | client.send("!dup") |
| 234 | client.close() |
| 235 | return |
| 236 | |
| 237 | # Acknowledge the client |
| 238 | client.send("wait") |
| 239 | |
| 240 | except socket.timeout: |
| 241 | # This is nominally an error, but as we do not know |
| 242 | # who that was we cannot do anything sane other |
| 243 | # than report it and let the normal timeout kill |
| 244 | # us when thats appropriate. |
| 245 | logging.warning("client handshake timeout: (%s:%d)", |
| 246 | addr[0], addr[1]) |
| 247 | client.close() |
| 248 | return |
| 249 | |
| 250 | logging.info("client now waiting: %s (%s:%d)", |
| 251 | name, addr[0], addr[1]) |
| 252 | |
| 253 | # They seem to be valid record them. |
| 254 | self._waiting[name] = connection |
| 255 | self._seen += 1 |
| 256 | |
| 257 | |
| 258 | def _slave_hello(self, connection): |
| 259 | (client, addr) = connection |
| 260 | name = None |
| 261 | |
| 262 | client.settimeout(5) |
| 263 | try: |
| 264 | client.send(self._tag + " " + self._hostid) |
| 265 | |
| 266 | reply = client.recv(4) |
| 267 | reply = reply.strip("\r\n") |
| 268 | logging.info("master said: %s", reply) |
| 269 | |
| 270 | # Confirm the master accepted the connection. |
| 271 | if reply != "wait": |
| 272 | logging.warning("Bad connection request to master") |
| 273 | client.close() |
| 274 | return |
| 275 | |
| 276 | except socket.timeout: |
| 277 | # This is nominally an error, but as we do not know |
| 278 | # who that was we cannot do anything sane other |
| 279 | # than report it and let the normal timeout kill |
| 280 | # us when thats appropriate. |
| 281 | logging.error("master handshake timeout: (%s:%d)", |
| 282 | addr[0], addr[1]) |
| 283 | client.close() |
| 284 | return |
| 285 | |
| 286 | logging.info("slave now waiting: (%s:%d)", addr[0], addr[1]) |
| 287 | |
| 288 | # They seem to be valid record them. |
| 289 | self._waiting[self._hostid] = connection |
| 290 | self._seen = 1 |
| 291 | |
| 292 | |
| 293 | def _master_release(self): |
| 294 | # Check everyone is still there, that they have not |
| 295 | # crashed or disconnected in the meantime. |
| 296 | allpresent = True |
| 297 | abort = self._abort |
| 298 | for name in self._waiting: |
| 299 | (client, addr) = self._waiting[name] |
| 300 | |
| 301 | logging.info("checking client present: %s", name) |
| 302 | |
| 303 | client.settimeout(5) |
| 304 | reply = 'none' |
| 305 | try: |
| 306 | client.send("ping") |
| 307 | reply = client.recv(1024) |
| 308 | except socket.timeout: |
| 309 | logging.warning("ping/pong timeout: %s", name) |
| 310 | pass |
| 311 | |
| 312 | if reply == 'abrt': |
| 313 | logging.warning("Client %s requested abort", name) |
| 314 | abort = True |
| 315 | elif reply != "pong": |
| 316 | allpresent = False |
| 317 | |
| 318 | if not allpresent: |
| 319 | raise error.BarrierError("master lost client") |
| 320 | |
| 321 | if abort: |
| 322 | logging.info("Aborting the clients") |
| 323 | msg = 'abrt' |
| 324 | else: |
| 325 | logging.info("Releasing clients") |
| 326 | msg = 'rlse' |
| 327 | |
| 328 | # If every ones checks in then commit the release. |
| 329 | for name in self._waiting: |
| 330 | (client, addr) = self._waiting[name] |
| 331 | |
| 332 | client.settimeout(5) |
| 333 | try: |
| 334 | client.send(msg) |
| 335 | except socket.timeout: |
| 336 | logging.warning("release timeout: %s", name) |
| 337 | pass |
| 338 | |
| 339 | if abort: |
| 340 | raise BarrierAbortError("Client requested abort") |
| 341 | |
| 342 | |
| 343 | def _waiting_close(self): |
| 344 | # Either way, close out all the clients. If we have |
| 345 | # not released them then they know to abort. |
| 346 | for name in self._waiting: |
| 347 | (client, addr) = self._waiting[name] |
| 348 | |
| 349 | logging.info("closing client: %s", name) |
| 350 | |
| 351 | try: |
| 352 | client.close() |
| 353 | except: |
| 354 | pass |
| 355 | |
| 356 | |
| 357 | def _run_server(self, is_master): |
| 358 | server = self._server or listen_server(port=self._port) |
| 359 | failed = 0 |
| 360 | try: |
| 361 | while True: |
| 362 | try: |
| 363 | # Wait for callers welcoming each. |
| 364 | server.socket.settimeout(self._remaining()) |
| 365 | connection = server.socket.accept() |
| 366 | if is_master: |
| 367 | self._master_welcome(connection) |
| 368 | else: |
| 369 | self._slave_hello(connection) |
| 370 | except socket.timeout: |
| 371 | logging.warning("timeout waiting for remaining clients") |
| 372 | pass |
| 373 | |
| 374 | if is_master: |
| 375 | # Check if everyone is here. |
| 376 | logging.info("master seen %d of %d", |
| 377 | self._seen, len(self._members)) |
| 378 | if self._seen == len(self._members): |
| 379 | self._master_release() |
| 380 | break |
| 381 | else: |
| 382 | # Check if master connected. |
| 383 | if self._seen: |
| 384 | logging.info("slave connected to master") |
| 385 | self._slave_wait() |
| 386 | break |
| 387 | finally: |
| 388 | self._waiting_close() |
| 389 | # if we created the listening_server in the beginning of this |
| 390 | # function then close the listening socket here |
| 391 | if not self._server: |
| 392 | server.close() |
| 393 | |
| 394 | |
| 395 | def _run_client(self, is_master): |
| 396 | while self._remaining() is None or self._remaining() > 0: |
| 397 | try: |
| 398 | remote = socket.socket(socket.AF_INET, |
| 399 | socket.SOCK_STREAM) |
| 400 | remote.settimeout(30) |
| 401 | if is_master: |
| 402 | # Connect to all slaves. |
| 403 | host = _get_host_from_id(self._members[self._seen]) |
| 404 | logging.info("calling slave: %s", host) |
| 405 | connection = (remote, (host, self._port)) |
| 406 | remote.connect(connection[1]) |
| 407 | self._master_welcome(connection) |
| 408 | else: |
| 409 | # Just connect to the master. |
| 410 | host = _get_host_from_id(self._masterid) |
| 411 | logging.info("calling master") |
| 412 | connection = (remote, (host, self._port)) |
| 413 | remote.connect(connection[1]) |
| 414 | self._slave_hello(connection) |
| 415 | except socket.timeout: |
| 416 | logging.warning("timeout calling host, retry") |
| 417 | sleep(10) |
| 418 | pass |
| 419 | except socket.error, err: |
| 420 | (code, str) = err |
Ting-Yuan Huang | 0ee498c | 2017-08-25 00:06:04 -0700 | [diff] [blame] | 421 | if (code != errno.ECONNREFUSED and |
| 422 | code != errno.ETIMEDOUT): |
Allen Li | 38c9960 | 2017-02-03 17:07:33 -0800 | [diff] [blame] | 423 | raise |
| 424 | sleep(10) |
| 425 | |
| 426 | if is_master: |
| 427 | # Check if everyone is here. |
| 428 | logging.info("master seen %d of %d", |
| 429 | self._seen, len(self._members)) |
| 430 | if self._seen == len(self._members): |
| 431 | self._master_release() |
| 432 | break |
| 433 | else: |
| 434 | # Check if master connected. |
| 435 | if self._seen: |
| 436 | logging.info("slave connected to master") |
| 437 | self._slave_wait() |
| 438 | break |
| 439 | |
| 440 | self._waiting_close() |
| 441 | |
| 442 | |
| 443 | def _slave_wait(self): |
| 444 | remote = self._waiting[self._hostid][0] |
| 445 | mode = "wait" |
| 446 | while True: |
| 447 | # All control messages are the same size to allow |
| 448 | # us to split individual messages easily. |
| 449 | remote.settimeout(self._remaining()) |
| 450 | reply = remote.recv(4) |
| 451 | if not reply: |
| 452 | break |
| 453 | |
| 454 | reply = reply.strip("\r\n") |
| 455 | logging.info("master said: %s", reply) |
| 456 | |
| 457 | mode = reply |
| 458 | if reply == "ping": |
| 459 | # Ensure we have sufficient time for the |
| 460 | # ping/pong/rlse cyle to complete normally. |
| 461 | self._update_timeout(10 + 10 * len(self._members)) |
| 462 | |
| 463 | if self._abort: |
| 464 | msg = "abrt" |
| 465 | else: |
| 466 | msg = "pong" |
| 467 | logging.info(msg) |
| 468 | remote.settimeout(self._remaining()) |
| 469 | remote.send(msg) |
| 470 | |
| 471 | elif reply == "rlse" or reply == "abrt": |
| 472 | # Ensure we have sufficient time for the |
| 473 | # ping/pong/rlse cyle to complete normally. |
| 474 | self._update_timeout(10 + 10 * len(self._members)) |
| 475 | |
| 476 | logging.info("was released, waiting for close") |
| 477 | |
| 478 | if mode == "rlse": |
| 479 | pass |
| 480 | elif mode == "wait": |
| 481 | raise error.BarrierError("master abort -- barrier timeout") |
| 482 | elif mode == "ping": |
| 483 | raise error.BarrierError("master abort -- client lost") |
| 484 | elif mode == "!tag": |
| 485 | raise error.BarrierError("master abort -- incorrect tag") |
| 486 | elif mode == "!dup": |
| 487 | raise error.BarrierError("master abort -- duplicate client") |
| 488 | elif mode == "abrt": |
| 489 | raise BarrierAbortError("Client requested abort") |
| 490 | else: |
| 491 | raise error.BarrierError("master handshake failure: " + mode) |
| 492 | |
| 493 | |
| 494 | def rendezvous(self, *hosts, **dargs): |
| 495 | # if called with abort=True, this will raise an exception |
| 496 | # on all the clients. |
| 497 | self._start_time = time() |
| 498 | self._members = list(hosts) |
| 499 | self._members.sort() |
| 500 | self._masterid = self._members.pop(0) |
| 501 | self._abort = dargs.get('abort', False) |
| 502 | |
| 503 | logging.info("masterid: %s", self._masterid) |
| 504 | if self._abort: |
| 505 | logging.debug("%s is aborting", self._hostid) |
| 506 | if not len(self._members): |
| 507 | logging.info("No other members listed.") |
| 508 | return |
| 509 | logging.info("members: %s", ",".join(self._members)) |
| 510 | |
| 511 | self._seen = 0 |
| 512 | self._waiting = {} |
| 513 | |
| 514 | # Figure out who is the master in this barrier. |
| 515 | if self._hostid == self._masterid: |
| 516 | logging.info("selected as master") |
| 517 | self._run_server(is_master=True) |
| 518 | else: |
| 519 | logging.info("selected as slave") |
| 520 | self._run_client(is_master=False) |
| 521 | |
| 522 | |
| 523 | def rendezvous_servers(self, masterid, *hosts, **dargs): |
| 524 | # if called with abort=True, this will raise an exception |
| 525 | # on all the clients. |
| 526 | self._start_time = time() |
| 527 | self._members = list(hosts) |
| 528 | self._members.sort() |
| 529 | self._masterid = masterid |
| 530 | self._abort = dargs.get('abort', False) |
| 531 | |
| 532 | logging.info("masterid: %s", self._masterid) |
| 533 | if not len(self._members): |
| 534 | logging.info("No other members listed.") |
| 535 | return |
| 536 | logging.info("members: %s", ",".join(self._members)) |
| 537 | |
| 538 | self._seen = 0 |
| 539 | self._waiting = {} |
| 540 | |
| 541 | # Figure out who is the master in this barrier. |
| 542 | if self._hostid == self._masterid: |
| 543 | logging.info("selected as master") |
| 544 | self._run_client(is_master=True) |
| 545 | else: |
| 546 | logging.info("selected as slave") |
| 547 | self._run_server(is_master=False) |