apw | 2db3931 | 2006-10-06 12:36:34 +0000 | [diff] [blame] | 1 | __author__ = """Copyright Andy Whitcroft 2006""" |
| 2 | |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 3 | import sys |
| 4 | import socket |
| 5 | import errno |
| 6 | from time import time, sleep |
| 7 | |
| 8 | import error |
| 9 | |
| 10 | class BarrierError(error.JobError): |
| 11 | pass |
| 12 | |
| 13 | class barrier: |
| 14 | """ Multi-machine barrier support |
| 15 | |
| 16 | Provides multi-machine barrier mechanism. Execution |
| 17 | stopping until all members arrive at the barrier. |
| 18 | |
| 19 | When a barrier is forming the master node (first in sort |
| 20 | order) in the set accepts connections from each member |
| 21 | of the set. As they arrive they indicate the barrier |
| 22 | they are joining and their identifier (their hostname |
| 23 | or IP address and optional tag). They are then asked |
| 24 | to wait. When all members are present the master node |
| 25 | then checks that each member is still responding via a |
| 26 | ping/pong exchange. If this is successful then everyone |
| 27 | has checked in at the barrier. We then tell everyone |
| 28 | they may continue via a rlse message. |
| 29 | |
| 30 | Where the master is not the first to reach the barrier |
| 31 | the client connects will fail. Client will retry until |
| 32 | they either succeed in connecting to master or the overal |
| 33 | timeout is exceeded. |
| 34 | |
| 35 | As an example here is the exchange for a three node |
| 36 | barrier called 'TAG' |
| 37 | |
| 38 | MASTER CLIENT1 CLIENT2 |
| 39 | <-------------TAG C1------------- |
| 40 | --------------wait--------------> |
| 41 | [...] |
| 42 | <-------------TAG C2----------------------------- |
| 43 | --------------wait------------------------------> |
| 44 | [...] |
| 45 | --------------ping--------------> |
| 46 | <-------------pong--------------- |
| 47 | --------------ping------------------------------> |
| 48 | <-------------pong------------------------------- |
| 49 | ----- BARRIER conditions MET ----- |
| 50 | --------------rlse--------------> |
| 51 | --------------rlse------------------------------> |
| 52 | |
| 53 | Note that once the last client has responded to pong the |
| 54 | barrier is implicitly deemed satisifed, they have all |
| 55 | acknowledged their presence. If we fail to send any |
| 56 | of the rlse messages the barrier is still a success, |
| 57 | the failed host has effectively broken 'right at the |
| 58 | beginning' of the post barrier execution window. |
| 59 | |
| 60 | For example: |
| 61 | if ME == SERVER: |
| 62 | server start |
| 63 | |
| 64 | b = job.barrier(ME, 'server-up', 120) |
| 65 | b.rendevous(CLIENT, SERVER) |
| 66 | |
| 67 | if ME == CLIENT: |
| 68 | client run |
| 69 | |
| 70 | b = job.barrier(ME, 'test-complete', 3600) |
| 71 | b.rendevous(CLIENT, SERVER) |
| 72 | |
| 73 | if ME == SERVER: |
| 74 | server stop |
| 75 | |
| 76 | Properties: |
| 77 | hostid |
| 78 | My hostname/IP address + optional tag |
| 79 | tag |
| 80 | Symbolic name of the barrier in progress |
| 81 | port |
| 82 | TCP port used for this barrier |
| 83 | timeout |
| 84 | Maximum time to wait for a the barrier to meet |
| 85 | start |
| 86 | Timestamp when we started waiting |
| 87 | members |
| 88 | All members we expect to find in the barrier |
| 89 | seen |
| 90 | Number of clients seen (master) |
| 91 | waiting |
| 92 | Clients who have checked in and are waiting (master) |
| 93 | """ |
| 94 | |
| 95 | def __init__(self, hostid, tag, timeout, port=63000): |
| 96 | self.hostid = hostid |
| 97 | self.tag = tag |
| 98 | self.port = port |
| 99 | self.timeout = timeout |
| 100 | self.start = time() |
| 101 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 102 | self.report("tag=%s port=%d timeout=%d start=%d" \ |
| 103 | % (self.tag, self.port, self.timeout, self.start)) |
| 104 | |
| 105 | def report(self, out): |
| 106 | print "barrier:", self.hostid, out |
apw | 019aa94 | 2006-11-06 17:37:16 +0000 | [diff] [blame] | 107 | sys.stdout.flush() |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 108 | |
| 109 | def update_timeout(self, timeout): |
| 110 | self.timeout = (time() - self.start) + timeout |
| 111 | |
| 112 | def remaining(self): |
| 113 | timeout = self.timeout - (time() - self.start) |
| 114 | if (timeout <= 0): |
| 115 | raise BarrierError("timeout waiting for barrier") |
| 116 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 117 | self.report("remaining: %d" % (timeout)) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 118 | return timeout |
| 119 | |
| 120 | def master_welcome(self, connection): |
| 121 | (client, addr) = connection |
| 122 | name = None |
| 123 | |
| 124 | client.settimeout(5) |
| 125 | try: |
| 126 | # Get the clients name. |
| 127 | intro = client.recv(1024) |
| 128 | intro = intro.strip("\r\n") |
| 129 | |
| 130 | (tag, name) = intro.split(' ') |
| 131 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 132 | self.report("new client tag=%s, name=%s" % (tag, name)) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 133 | |
| 134 | # Ok, we know who is trying to attach. Confirm that |
| 135 | # they are coming to the same meeting. Also, everyone |
| 136 | # should be using a unique handle (their IP address). |
| 137 | # If we see a duplicate, something _bad_ has happened |
| 138 | # so drop them now. |
| 139 | if self.tag != tag: |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 140 | self.report("client arriving for the " \ |
| 141 | "wrong barrier") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 142 | client.settimeout(5) |
| 143 | client.send("!tag") |
| 144 | client.close() |
| 145 | return |
| 146 | elif name in self.waiting: |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 147 | self.report("duplicate client") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 148 | client.settimeout(5) |
| 149 | client.send("!dup") |
| 150 | client.close() |
| 151 | return |
| 152 | |
| 153 | # Acknowledge the client |
| 154 | client.send("wait") |
| 155 | |
| 156 | except socket.timeout: |
| 157 | # This is nominally an error, but as we do not know |
| 158 | # who that was we cannot do anything sane other |
| 159 | # than report it and let the normal timeout kill |
| 160 | # us when thats appropriate. |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 161 | self.report("client handshake timeout: (%s:%d)" %\ |
| 162 | (addr[0], addr[1])) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 163 | client.close() |
| 164 | return |
| 165 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 166 | self.report("client now waiting: %s (%s:%d)" % \ |
| 167 | (name, addr[0], addr[1])) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 168 | |
| 169 | # They seem to be valid record them. |
| 170 | self.waiting[name] = connection |
| 171 | self.seen += 1 |
| 172 | |
| 173 | def master_release(self): |
| 174 | # Check everyone is still there, that they have not |
| 175 | # crashed or disconnected in the meantime. |
| 176 | allpresent = 1 |
| 177 | for name in self.waiting: |
| 178 | (client, addr) = self.waiting[name] |
| 179 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 180 | self.report("checking client present: " + name) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 181 | |
| 182 | client.settimeout(5) |
| 183 | reply = 'none' |
| 184 | try: |
| 185 | client.send("ping") |
| 186 | reply = client.recv(1024) |
| 187 | except socket.timeout: |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 188 | self.report("ping/pong timeout: " + name) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 189 | pass |
| 190 | |
| 191 | if reply != "pong": |
| 192 | allpresent = 0 |
| 193 | |
| 194 | if not allpresent: |
| 195 | raise BarrierError("master lost client") |
| 196 | |
| 197 | # If every ones checks in then commit the release. |
| 198 | for name in self.waiting: |
| 199 | (client, addr) = self.waiting[name] |
| 200 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 201 | self.report("releasing client: " + name) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 202 | |
| 203 | client.settimeout(5) |
| 204 | try: |
| 205 | client.send("rlse") |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 206 | except socket.timeout: |
| 207 | self.report("release timeout: " + name) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 208 | pass |
| 209 | |
| 210 | def master_close(self): |
| 211 | # Either way, close out all the clients. If we have |
| 212 | # not released them then they know to abort. |
| 213 | for name in self.waiting: |
| 214 | (client, addr) = self.waiting[name] |
| 215 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 216 | self.report("closing client: " + name) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 217 | |
| 218 | try: |
| 219 | client.close() |
| 220 | except: |
| 221 | pass |
| 222 | |
| 223 | # And finally close out our server socket. |
| 224 | self.server.close() |
| 225 | |
| 226 | def master(self): |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 227 | self.report("selected as master") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 228 | |
| 229 | self.seen = 1 |
| 230 | self.waiting = {} |
| 231 | self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 232 | self.server.setsockopt(socket.SOL_SOCKET, |
| 233 | socket.SO_REUSEADDR, 1) |
| 234 | self.server.bind(('', self.port)) |
| 235 | self.server.listen(10) |
| 236 | |
| 237 | failed = 0 |
| 238 | try: |
| 239 | while 1: |
| 240 | try: |
| 241 | # Wait for callers welcoming each. |
| 242 | self.server.settimeout(self.remaining()) |
| 243 | connection = self.server.accept() |
| 244 | self.master_welcome(connection) |
| 245 | except socket.timeout: |
apw | 019aa94 | 2006-11-06 17:37:16 +0000 | [diff] [blame] | 246 | self.report("timeout waiting for " + |
| 247 | "remaining clients") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 248 | pass |
| 249 | |
| 250 | # Check if everyone is here. |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 251 | self.report("master seen %d of %d" % \ |
| 252 | (self.seen, len(self.members))) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 253 | if self.seen == len(self.members): |
| 254 | self.master_release() |
| 255 | break |
| 256 | |
| 257 | self.master_close() |
| 258 | except: |
| 259 | self.master_close() |
| 260 | raise |
| 261 | |
| 262 | def slave(self): |
| 263 | # Clip out the master host in the barrier, remove any |
| 264 | # trailing local identifier following a #. This allows |
| 265 | # multiple members per host which is particularly helpful |
| 266 | # in testing. |
| 267 | master = (self.members[0].split('#'))[0] |
| 268 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 269 | self.report("selected as slave, master=" + master) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 270 | |
| 271 | # Connect to them. |
| 272 | remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 273 | while self.remaining() > 0: |
| 274 | remote.settimeout(30) |
| 275 | try: |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 276 | self.report("calling master") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 277 | remote.connect((master, self.port)) |
| 278 | break |
| 279 | except socket.timeout: |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 280 | self.report("timeout calling master, retry") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 281 | sleep(10) |
| 282 | pass |
| 283 | except socket.error, err: |
| 284 | (code, str) = err |
| 285 | if (code != errno.ECONNREFUSED): |
| 286 | raise |
| 287 | sleep(10) |
| 288 | |
| 289 | remote.settimeout(self.remaining()) |
| 290 | remote.send(self.tag + " " + self.hostid) |
| 291 | |
| 292 | mode = "none" |
| 293 | while 1: |
| 294 | # All control messages are the same size to allow |
| 295 | # us to split individual messages easily. |
| 296 | remote.settimeout(self.remaining()) |
| 297 | reply = remote.recv(4) |
| 298 | if not reply: |
| 299 | break |
| 300 | |
| 301 | reply = reply.strip("\r\n") |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 302 | self.report("master said: " + reply) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 303 | |
| 304 | mode = reply |
| 305 | if reply == "ping": |
| 306 | # Ensure we have sufficient time for the |
| 307 | # ping/pong/rlse cyle to complete normally. |
| 308 | self.update_timeout(10 * len(self.members)) |
| 309 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 310 | self.report("pong") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 311 | remote.settimeout(self.remaining()) |
mbligh | 7d8200d | 2006-09-23 05:25:10 +0000 | [diff] [blame] | 312 | remote.send("pong") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 313 | |
| 314 | elif reply == "rlse": |
| 315 | # Ensure we have sufficient time for the |
| 316 | # ping/pong/rlse cyle to complete normally. |
| 317 | self.update_timeout(10 * len(self.members)) |
| 318 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 319 | self.report("was released, waiting for close") |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 320 | |
| 321 | if mode == "rlse": |
| 322 | pass |
| 323 | elif mode == "wait": |
| 324 | raise BarrierError("master abort -- barrier timeout") |
| 325 | elif mode == "ping": |
| 326 | raise BarrierError("master abort -- client lost") |
| 327 | elif mode == "!tag": |
| 328 | raise BarrierError("master abort -- incorrect tag") |
| 329 | elif mode == "!dup": |
| 330 | raise BarrierError("master abort -- duplicate client") |
| 331 | else: |
| 332 | raise BarrierError("master handshake failure: " + mode) |
| 333 | |
| 334 | def rendevous(self, *hosts): |
| 335 | self.members = list(hosts) |
| 336 | self.members.sort() |
| 337 | |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 338 | self.report("members: " + ",".join(self.members)) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 339 | |
| 340 | # Figure out who is the master in this barrier. |
| 341 | if self.hostid == self.members[0]: |
| 342 | self.master() |
| 343 | else: |
| 344 | self.slave() |
| 345 | |
| 346 | |
| 347 | # |
| 348 | # TESTING -- direct test harness. |
| 349 | # |
| 350 | # For example, run in parallel: |
| 351 | # python bin/barrier.py 1 meeting |
| 352 | # python bin/barrier.py 2 meeting |
| 353 | # python bin/barrier.py 3 meeting |
| 354 | # |
| 355 | if __name__ == "__main__": |
mbligh | 7d8200d | 2006-09-23 05:25:10 +0000 | [diff] [blame] | 356 | barrier = barrier('127.0.0.1#' + sys.argv[1], sys.argv[2], 60) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 357 | |
| 358 | try: |
mbligh | 7d8200d | 2006-09-23 05:25:10 +0000 | [diff] [blame] | 359 | all = [ '127.0.0.1#2', '127.0.0.1#1', '127.0.0.1#3' ] |
| 360 | barrier.rendevous(*all) |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 361 | except BarrierError, err: |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 362 | print "barrier: 127.0.0.1#" + sys.argv[1] + \ |
| 363 | ": barrier failed:", err |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 364 | sys.exit(1) |
| 365 | else: |
apw | b13af9e | 2006-09-25 22:05:42 +0000 | [diff] [blame] | 366 | print "barrier: 127.0.0.1#" + sys.argv[1] + \ |
| 367 | ": all present and accounted for" |
mbligh | fadca20 | 2006-09-23 04:40:01 +0000 | [diff] [blame] | 368 | sys.exit(0) |