mbligh | 999fb13 | 2010-04-23 17:22:03 +0000 | [diff] [blame] | 1 | import sys, socket, errno, logging |
| 2 | from time import time, sleep |
| 3 | from autotest_lib.client.common_lib import error |
| 4 | |
| 5 | # default barrier port |
| 6 | _DEFAULT_PORT = 11922 |
| 7 | |
| 8 | |
| 9 | class BarrierAbortError(error.BarrierError): |
| 10 | """Special BarrierError raised when an explicit abort is requested.""" |
| 11 | |
| 12 | |
| 13 | class listen_server(object): |
| 14 | """ |
| 15 | Manages a listening socket for barrier. |
| 16 | |
| 17 | Can be used to run multiple barrier instances with the same listening |
| 18 | socket (if they were going to listen on the same port). |
| 19 | |
| 20 | Attributes: |
| 21 | |
| 22 | @attr address: Address to bind to (string). |
| 23 | @attr port: Port to bind to. |
| 24 | @attr socket: Listening socket object. |
| 25 | """ |
| 26 | def __init__(self, address='', port=_DEFAULT_PORT): |
| 27 | """ |
| 28 | Create a listen_server instance for the given address/port. |
| 29 | |
| 30 | @param address: The address to listen on. |
| 31 | @param port: The port to listen on. |
| 32 | """ |
| 33 | self.address = address |
| 34 | self.port = port |
| 35 | self.socket = self._setup() |
| 36 | |
| 37 | |
| 38 | def _setup(self): |
| 39 | """Create, bind and listen on the listening socket.""" |
| 40 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 41 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 42 | sock.bind((self.address, self.port)) |
| 43 | sock.listen(10) |
| 44 | |
| 45 | return sock |
| 46 | |
| 47 | |
| 48 | def close(self): |
| 49 | """Close the listening socket.""" |
| 50 | self.socket.close() |
| 51 | |
| 52 | |
| 53 | class barrier(object): |
| 54 | """Multi-machine barrier support. |
| 55 | |
| 56 | Provides multi-machine barrier mechanism. |
| 57 | Execution stops until all members arrive at the barrier. |
| 58 | |
| 59 | Implementation Details: |
| 60 | ....................... |
| 61 | |
| 62 | When a barrier is forming the master node (first in sort order) in the |
| 63 | set accepts connections from each member of the set. As they arrive |
| 64 | they indicate the barrier they are joining and their identifier (their |
| 65 | hostname or IP address and optional tag). They are then asked to wait. |
| 66 | When all members are present the master node then checks that each |
| 67 | member is still responding via a ping/pong exchange. If this is |
| 68 | successful then everyone has checked in at the barrier. We then tell |
| 69 | everyone they may continue via a rlse message. |
| 70 | |
| 71 | Where the master is not the first to reach the barrier the client |
| 72 | connects will fail. Client will retry until they either succeed in |
| 73 | connecting to master or the overall timeout is exceeded. |
| 74 | |
| 75 | As an example here is the exchange for a three node barrier called |
| 76 | 'TAG' |
| 77 | |
| 78 | MASTER CLIENT1 CLIENT2 |
| 79 | <-------------TAG C1------------- |
| 80 | --------------wait--------------> |
| 81 | [...] |
| 82 | <-------------TAG C2----------------------------- |
| 83 | --------------wait------------------------------> |
| 84 | [...] |
| 85 | --------------ping--------------> |
| 86 | <-------------pong--------------- |
| 87 | --------------ping------------------------------> |
| 88 | <-------------pong------------------------------- |
| 89 | ----- BARRIER conditions MET ----- |
| 90 | --------------rlse--------------> |
| 91 | --------------rlse------------------------------> |
| 92 | |
| 93 | Note that once the last client has responded to pong the barrier is |
| 94 | implicitly deemed satisifed, they have all acknowledged their presence. |
| 95 | If we fail to send any of the rlse messages the barrier is still a |
| 96 | success, the failed host has effectively broken 'right at the beginning' |
| 97 | of the post barrier execution window. |
| 98 | |
| 99 | In addition, there is another rendezvous, that makes each slave a server |
| 100 | and the master a client. The connection process and usage is still the |
| 101 | same but allows barriers from machines that only have a one-way |
| 102 | connection initiation. This is called rendezvous_servers. |
| 103 | |
| 104 | For example: |
| 105 | if ME == SERVER: |
| 106 | server start |
| 107 | |
| 108 | b = job.barrier(ME, 'server-up', 120) |
| 109 | b.rendezvous(CLIENT, SERVER) |
| 110 | |
| 111 | if ME == CLIENT: |
| 112 | client run |
| 113 | |
| 114 | b = job.barrier(ME, 'test-complete', 3600) |
| 115 | b.rendezvous(CLIENT, SERVER) |
| 116 | |
| 117 | if ME == SERVER: |
| 118 | server stop |
| 119 | |
| 120 | Any client can also request an abort of the job by setting |
| 121 | abort=True in the rendezvous arguments. |
| 122 | """ |
| 123 | |
| 124 | def __init__(self, hostid, tag, timeout=None, port=None, |
| 125 | listen_server=None): |
| 126 | """ |
| 127 | @param hostid: My hostname/IP address + optional tag. |
| 128 | @param tag: Symbolic name of the barrier in progress. |
| 129 | @param timeout: Maximum seconds to wait for a the barrier to meet. |
| 130 | @param port: Port number to listen on. |
| 131 | @param listen_server: External listen_server instance to use instead |
| 132 | of creating our own. Create a listen_server instance and |
| 133 | reuse it across multiple barrier instances so that the |
| 134 | barrier code doesn't try to quickly re-bind on the same port |
| 135 | (packets still in transit for the previous barrier they may |
| 136 | reset new connections). |
| 137 | """ |
| 138 | self._hostid = hostid |
| 139 | self._tag = tag |
| 140 | if listen_server: |
| 141 | if port: |
| 142 | raise error.BarrierError( |
| 143 | '"port" and "listen_server" are mutually exclusive.') |
| 144 | self._port = listen_server.port |
| 145 | else: |
| 146 | self._port = port or _DEFAULT_PORT |
| 147 | self._server = listen_server # A listen_server instance or None. |
| 148 | self._members = [] # List of hosts we expect to find at the barrier. |
| 149 | self._timeout_secs = timeout |
| 150 | self._start_time = None # Timestamp of when we started waiting. |
| 151 | self._masterid = None # Host/IP + optional tag of selected master. |
| 152 | logging.info("tag=%s port=%d timeout=%r", |
| 153 | self._tag, self._port, self._timeout_secs) |
| 154 | |
| 155 | # Number of clients seen (should be the length of self._waiting). |
| 156 | self._seen = 0 |
| 157 | |
| 158 | # Clients who have checked in and are waiting (if we are a master). |
| 159 | self._waiting = {} # Maps from hostname -> (client, addr) tuples. |
| 160 | |
| 161 | |
| 162 | def _get_host_from_id(self, hostid): |
| 163 | # Remove any trailing local identifier following a #. |
| 164 | # This allows multiple members per host which is particularly |
| 165 | # helpful in testing. |
| 166 | if not hostid.startswith('#'): |
| 167 | return hostid.split('#')[0] |
| 168 | else: |
| 169 | raise error.BarrierError( |
| 170 | "Invalid Host id: Host Address should be specified") |
| 171 | |
| 172 | |
| 173 | def _update_timeout(self, timeout): |
| 174 | if timeout is not None and self._start_time is not None: |
| 175 | self._timeout_secs = (time() - self._start_time) + timeout |
| 176 | else: |
| 177 | self._timeout_secs = timeout |
| 178 | |
| 179 | |
| 180 | def _remaining(self): |
| 181 | if self._timeout_secs is not None and self._start_time is not None: |
| 182 | timeout = self._timeout_secs - (time() - self._start_time) |
| 183 | if timeout <= 0: |
| 184 | errmsg = "timeout waiting for barrier: %s" % self._tag |
| 185 | logging.error(error) |
| 186 | raise error.BarrierError(errmsg) |
| 187 | else: |
| 188 | timeout = self._timeout_secs |
| 189 | |
| 190 | if self._timeout_secs is not None: |
| 191 | logging.info("seconds remaining: %d", timeout) |
| 192 | return timeout |
| 193 | |
| 194 | |
| 195 | def _master_welcome(self, connection): |
| 196 | client, addr = connection |
| 197 | name = None |
| 198 | |
| 199 | client.settimeout(5) |
| 200 | try: |
| 201 | # Get the clients name. |
| 202 | intro = client.recv(1024) |
| 203 | intro = intro.strip("\r\n") |
| 204 | |
| 205 | intro_parts = intro.split(' ', 2) |
| 206 | if len(intro_parts) != 2: |
| 207 | logging.warn("Ignoring invalid data from %s: %r", |
| 208 | client.getpeername(), intro) |
| 209 | client.close() |
| 210 | return |
| 211 | tag, name = intro_parts |
| 212 | |
| 213 | logging.info("new client tag=%s, name=%s", tag, name) |
| 214 | |
| 215 | # Ok, we know who is trying to attach. Confirm that |
| 216 | # they are coming to the same meeting. Also, everyone |
| 217 | # should be using a unique handle (their IP address). |
| 218 | # If we see a duplicate, something _bad_ has happened |
| 219 | # so drop them now. |
| 220 | if self._tag != tag: |
| 221 | logging.warn("client arriving for the wrong barrier: %s != %s", |
| 222 | self._tag, tag) |
| 223 | client.settimeout(5) |
| 224 | client.send("!tag") |
| 225 | client.close() |
| 226 | return |
| 227 | elif name in self._waiting: |
| 228 | logging.warn("duplicate client") |
| 229 | client.settimeout(5) |
| 230 | client.send("!dup") |
| 231 | client.close() |
| 232 | return |
| 233 | |
| 234 | # Acknowledge the client |
| 235 | client.send("wait") |
| 236 | |
| 237 | except socket.timeout: |
| 238 | # This is nominally an error, but as we do not know |
| 239 | # who that was we cannot do anything sane other |
| 240 | # than report it and let the normal timeout kill |
| 241 | # us when thats appropriate. |
| 242 | logging.warn("client handshake timeout: (%s:%d)", |
| 243 | addr[0], addr[1]) |
| 244 | client.close() |
| 245 | return |
| 246 | |
| 247 | logging.info("client now waiting: %s (%s:%d)", |
| 248 | name, addr[0], addr[1]) |
| 249 | |
| 250 | # They seem to be valid record them. |
| 251 | self._waiting[name] = connection |
| 252 | self._seen += 1 |
| 253 | |
| 254 | |
| 255 | def _slave_hello(self, connection): |
| 256 | (client, addr) = connection |
| 257 | name = None |
| 258 | |
| 259 | client.settimeout(5) |
| 260 | try: |
| 261 | client.send(self._tag + " " + self._hostid) |
| 262 | |
| 263 | reply = client.recv(4) |
| 264 | reply = reply.strip("\r\n") |
| 265 | logging.info("master said: %s", reply) |
| 266 | |
| 267 | # Confirm the master accepted the connection. |
| 268 | if reply != "wait": |
| 269 | logging.warn("Bad connection request to master") |
| 270 | client.close() |
| 271 | return |
| 272 | |
| 273 | except socket.timeout: |
| 274 | # This is nominally an error, but as we do not know |
| 275 | # who that was we cannot do anything sane other |
| 276 | # than report it and let the normal timeout kill |
| 277 | # us when thats appropriate. |
| 278 | logging.error("master handshake timeout: (%s:%d)", |
| 279 | addr[0], addr[1]) |
| 280 | client.close() |
| 281 | return |
| 282 | |
| 283 | logging.info("slave now waiting: (%s:%d)", addr[0], addr[1]) |
| 284 | |
| 285 | # They seem to be valid record them. |
| 286 | self._waiting[self._hostid] = connection |
| 287 | self._seen = 1 |
| 288 | |
| 289 | |
| 290 | def _master_release(self): |
| 291 | # Check everyone is still there, that they have not |
| 292 | # crashed or disconnected in the meantime. |
| 293 | allpresent = True |
| 294 | abort = self._abort |
| 295 | for name in self._waiting: |
| 296 | (client, addr) = self._waiting[name] |
| 297 | |
| 298 | logging.info("checking client present: %s", name) |
| 299 | |
| 300 | client.settimeout(5) |
| 301 | reply = 'none' |
| 302 | try: |
| 303 | client.send("ping") |
| 304 | reply = client.recv(1024) |
| 305 | except socket.timeout: |
| 306 | logging.warn("ping/pong timeout: %s", name) |
| 307 | pass |
| 308 | |
| 309 | if reply == 'abrt': |
| 310 | logging.warn("Client %s requested abort", name) |
| 311 | abort = True |
| 312 | elif reply != "pong": |
| 313 | allpresent = False |
| 314 | |
| 315 | if not allpresent: |
| 316 | raise error.BarrierError("master lost client") |
| 317 | |
| 318 | if abort: |
| 319 | logging.info("Aborting the clients") |
| 320 | msg = 'abrt' |
| 321 | else: |
| 322 | logging.info("Releasing clients") |
| 323 | msg = 'rlse' |
| 324 | |
| 325 | # If every ones checks in then commit the release. |
| 326 | for name in self._waiting: |
| 327 | (client, addr) = self._waiting[name] |
| 328 | |
| 329 | client.settimeout(5) |
| 330 | try: |
| 331 | client.send(msg) |
| 332 | except socket.timeout: |
| 333 | logging.warn("release timeout: %s", name) |
| 334 | pass |
| 335 | |
| 336 | if abort: |
| 337 | raise BarrierAbortError("Client requested abort") |
| 338 | |
| 339 | |
| 340 | def _waiting_close(self): |
| 341 | # Either way, close out all the clients. If we have |
| 342 | # not released them then they know to abort. |
| 343 | for name in self._waiting: |
| 344 | (client, addr) = self._waiting[name] |
| 345 | |
| 346 | logging.info("closing client: %s", name) |
| 347 | |
| 348 | try: |
| 349 | client.close() |
| 350 | except: |
| 351 | pass |
| 352 | |
| 353 | |
| 354 | def _run_server(self, is_master): |
| 355 | server = self._server or listen_server(port=self._port) |
| 356 | failed = 0 |
| 357 | try: |
| 358 | while True: |
| 359 | try: |
| 360 | # Wait for callers welcoming each. |
| 361 | server.socket.settimeout(self._remaining()) |
| 362 | connection = server.socket.accept() |
| 363 | if is_master: |
| 364 | self._master_welcome(connection) |
| 365 | else: |
| 366 | self._slave_hello(connection) |
| 367 | except socket.timeout: |
| 368 | logging.warn("timeout waiting for remaining clients") |
| 369 | pass |
| 370 | |
| 371 | if is_master: |
| 372 | # Check if everyone is here. |
| 373 | logging.info("master seen %d of %d", |
| 374 | self._seen, len(self._members)) |
| 375 | if self._seen == len(self._members): |
| 376 | self._master_release() |
| 377 | break |
| 378 | else: |
| 379 | # Check if master connected. |
| 380 | if self._seen: |
| 381 | logging.info("slave connected to master") |
| 382 | self._slave_wait() |
| 383 | break |
| 384 | finally: |
| 385 | self._waiting_close() |
| 386 | # if we created the listening_server in the beginning of this |
| 387 | # function then close the listening socket here |
| 388 | if not self._server: |
| 389 | server.close() |
| 390 | |
| 391 | |
| 392 | def _run_client(self, is_master): |
| 393 | while self._remaining() is None or self._remaining() > 0: |
| 394 | try: |
| 395 | remote = socket.socket(socket.AF_INET, |
| 396 | socket.SOCK_STREAM) |
| 397 | remote.settimeout(30) |
| 398 | if is_master: |
| 399 | # Connect to all slaves. |
| 400 | host = self._get_host_from_id(self._members[self._seen]) |
| 401 | logging.info("calling slave: %s", host) |
| 402 | connection = (remote, (host, self._port)) |
| 403 | remote.connect(connection[1]) |
| 404 | self._master_welcome(connection) |
| 405 | else: |
| 406 | # Just connect to the master. |
| 407 | host = self._get_host_from_id(self._masterid) |
| 408 | logging.info("calling master") |
| 409 | connection = (remote, (host, self._port)) |
| 410 | remote.connect(connection[1]) |
| 411 | self._slave_hello(connection) |
| 412 | except socket.timeout: |
| 413 | logging.warn("timeout calling host, retry") |
| 414 | sleep(10) |
| 415 | pass |
| 416 | except socket.error, err: |
| 417 | (code, str) = err |
| 418 | if (code != errno.ECONNREFUSED): |
| 419 | raise |
| 420 | sleep(10) |
| 421 | |
| 422 | if is_master: |
| 423 | # Check if everyone is here. |
| 424 | logging.info("master seen %d of %d", |
| 425 | self._seen, len(self._members)) |
| 426 | if self._seen == len(self._members): |
| 427 | self._master_release() |
| 428 | break |
| 429 | else: |
| 430 | # Check if master connected. |
| 431 | if self._seen: |
| 432 | logging.info("slave connected to master") |
| 433 | self._slave_wait() |
| 434 | break |
| 435 | |
| 436 | self._waiting_close() |
| 437 | |
| 438 | |
| 439 | def _slave_wait(self): |
| 440 | remote = self._waiting[self._hostid][0] |
| 441 | mode = "wait" |
| 442 | while True: |
| 443 | # All control messages are the same size to allow |
| 444 | # us to split individual messages easily. |
| 445 | remote.settimeout(self._remaining()) |
| 446 | reply = remote.recv(4) |
| 447 | if not reply: |
| 448 | break |
| 449 | |
| 450 | reply = reply.strip("\r\n") |
| 451 | logging.info("master said: %s", reply) |
| 452 | |
| 453 | mode = reply |
| 454 | if reply == "ping": |
| 455 | # Ensure we have sufficient time for the |
| 456 | # ping/pong/rlse cyle to complete normally. |
| 457 | self._update_timeout(10 + 10 * len(self._members)) |
| 458 | |
| 459 | if self._abort: |
| 460 | msg = "abrt" |
| 461 | else: |
| 462 | msg = "pong" |
| 463 | logging.info(msg) |
| 464 | remote.settimeout(self._remaining()) |
| 465 | remote.send(msg) |
| 466 | |
| 467 | elif reply == "rlse" or reply == "abrt": |
| 468 | # Ensure we have sufficient time for the |
| 469 | # ping/pong/rlse cyle to complete normally. |
| 470 | self._update_timeout(10 + 10 * len(self._members)) |
| 471 | |
| 472 | logging.info("was released, waiting for close") |
| 473 | |
| 474 | if mode == "rlse": |
| 475 | pass |
| 476 | elif mode == "wait": |
| 477 | raise error.BarrierError("master abort -- barrier timeout") |
| 478 | elif mode == "ping": |
| 479 | raise error.BarrierError("master abort -- client lost") |
| 480 | elif mode == "!tag": |
| 481 | raise error.BarrierError("master abort -- incorrect tag") |
| 482 | elif mode == "!dup": |
| 483 | raise error.BarrierError("master abort -- duplicate client") |
| 484 | elif mode == "abrt": |
| 485 | raise BarrierAbortError("Client requested abort") |
| 486 | else: |
| 487 | raise error.BarrierError("master handshake failure: " + mode) |
| 488 | |
| 489 | |
| 490 | def rendezvous(self, *hosts, **dargs): |
| 491 | # if called with abort=True, this will raise an exception |
| 492 | # on all the clients. |
| 493 | self._start_time = time() |
| 494 | self._members = list(hosts) |
| 495 | self._members.sort() |
| 496 | self._masterid = self._members.pop(0) |
| 497 | self._abort = dargs.get('abort', False) |
| 498 | |
| 499 | logging.info("masterid: %s", self._masterid) |
| 500 | if self._abort: |
| 501 | logging.debug("%s is aborting", self._hostid) |
| 502 | if not len(self._members): |
| 503 | logging.info("No other members listed.") |
| 504 | return |
| 505 | logging.info("members: %s", ",".join(self._members)) |
| 506 | |
| 507 | self._seen = 0 |
| 508 | self._waiting = {} |
| 509 | |
| 510 | # Figure out who is the master in this barrier. |
| 511 | if self._hostid == self._masterid: |
| 512 | logging.info("selected as master") |
| 513 | self._run_server(is_master=True) |
| 514 | else: |
| 515 | logging.info("selected as slave") |
| 516 | self._run_client(is_master=False) |
| 517 | |
| 518 | |
| 519 | def rendezvous_servers(self, masterid, *hosts, **dargs): |
| 520 | # if called with abort=True, this will raise an exception |
| 521 | # on all the clients. |
| 522 | self._start_time = time() |
| 523 | self._members = list(hosts) |
| 524 | self._members.sort() |
| 525 | self._masterid = masterid |
| 526 | self._abort = dargs.get('abort', False) |
| 527 | |
| 528 | logging.info("masterid: %s", self._masterid) |
| 529 | if not len(self._members): |
| 530 | logging.info("No other members listed.") |
| 531 | return |
| 532 | logging.info("members: %s", ",".join(self._members)) |
| 533 | |
| 534 | self._seen = 0 |
| 535 | self._waiting = {} |
| 536 | |
| 537 | # Figure out who is the master in this barrier. |
| 538 | if self._hostid == self._masterid: |
| 539 | logging.info("selected as master") |
| 540 | self._run_client(is_master=True) |
| 541 | else: |
| 542 | logging.info("selected as slave") |
| 543 | self._run_server(is_master=False) |