blob: 1dad1561ebb6758c4dfc480e4bc8240266960f80 [file] [log] [blame]
apw2db39312006-10-06 12:36:34 +00001__author__ = """Copyright Andy Whitcroft 2006"""
2
mblighfadca202006-09-23 04:40:01 +00003import sys
4import socket
5import errno
6from time import time, sleep
7
8import error
9
10class BarrierError(error.JobError):
11 pass
12
13class barrier:
14 """ Multi-machine barrier support
15
16 Provides multi-machine barrier mechanism. Execution
17 stopping until all members arrive at the barrier.
18
19 When a barrier is forming the master node (first in sort
20 order) in the set accepts connections from each member
21 of the set. As they arrive they indicate the barrier
22 they are joining and their identifier (their hostname
23 or IP address and optional tag). They are then asked
24 to wait. When all members are present the master node
25 then checks that each member is still responding via a
26 ping/pong exchange. If this is successful then everyone
27 has checked in at the barrier. We then tell everyone
28 they may continue via a rlse message.
29
30 Where the master is not the first to reach the barrier
31 the client connects will fail. Client will retry until
32 they either succeed in connecting to master or the overal
33 timeout is exceeded.
34
35 As an example here is the exchange for a three node
36 barrier called 'TAG'
37
38 MASTER CLIENT1 CLIENT2
39 <-------------TAG C1-------------
40 --------------wait-------------->
41 [...]
42 <-------------TAG C2-----------------------------
43 --------------wait------------------------------>
44 [...]
45 --------------ping-------------->
46 <-------------pong---------------
47 --------------ping------------------------------>
48 <-------------pong-------------------------------
49 ----- BARRIER conditions MET -----
50 --------------rlse-------------->
51 --------------rlse------------------------------>
52
53 Note that once the last client has responded to pong the
54 barrier is implicitly deemed satisifed, they have all
55 acknowledged their presence. If we fail to send any
56 of the rlse messages the barrier is still a success,
57 the failed host has effectively broken 'right at the
58 beginning' of the post barrier execution window.
59
60 For example:
61 if ME == SERVER:
62 server start
63
64 b = job.barrier(ME, 'server-up', 120)
65 b.rendevous(CLIENT, SERVER)
66
67 if ME == CLIENT:
68 client run
69
70 b = job.barrier(ME, 'test-complete', 3600)
71 b.rendevous(CLIENT, SERVER)
72
73 if ME == SERVER:
74 server stop
75
76 Properties:
77 hostid
78 My hostname/IP address + optional tag
79 tag
80 Symbolic name of the barrier in progress
81 port
82 TCP port used for this barrier
83 timeout
84 Maximum time to wait for a the barrier to meet
85 start
86 Timestamp when we started waiting
87 members
88 All members we expect to find in the barrier
89 seen
90 Number of clients seen (master)
91 waiting
92 Clients who have checked in and are waiting (master)
93 """
94
95 def __init__(self, hostid, tag, timeout, port=63000):
96 self.hostid = hostid
97 self.tag = tag
98 self.port = port
99 self.timeout = timeout
100 self.start = time()
101
apwb13af9e2006-09-25 22:05:42 +0000102 self.report("tag=%s port=%d timeout=%d start=%d" \
103 % (self.tag, self.port, self.timeout, self.start))
104
105 def report(self, out):
106 print "barrier:", self.hostid, out
apw019aa942006-11-06 17:37:16 +0000107 sys.stdout.flush()
mblighfadca202006-09-23 04:40:01 +0000108
109 def update_timeout(self, timeout):
110 self.timeout = (time() - self.start) + timeout
111
112 def remaining(self):
113 timeout = self.timeout - (time() - self.start)
114 if (timeout <= 0):
115 raise BarrierError("timeout waiting for barrier")
116
apwb13af9e2006-09-25 22:05:42 +0000117 self.report("remaining: %d" % (timeout))
mblighfadca202006-09-23 04:40:01 +0000118 return timeout
119
120 def master_welcome(self, connection):
121 (client, addr) = connection
122 name = None
123
124 client.settimeout(5)
125 try:
126 # Get the clients name.
127 intro = client.recv(1024)
128 intro = intro.strip("\r\n")
129
130 (tag, name) = intro.split(' ')
131
apwb13af9e2006-09-25 22:05:42 +0000132 self.report("new client tag=%s, name=%s" % (tag, name))
mblighfadca202006-09-23 04:40:01 +0000133
134 # Ok, we know who is trying to attach. Confirm that
135 # they are coming to the same meeting. Also, everyone
136 # should be using a unique handle (their IP address).
137 # If we see a duplicate, something _bad_ has happened
138 # so drop them now.
139 if self.tag != tag:
apwb13af9e2006-09-25 22:05:42 +0000140 self.report("client arriving for the " \
141 "wrong barrier")
mblighfadca202006-09-23 04:40:01 +0000142 client.settimeout(5)
143 client.send("!tag")
144 client.close()
145 return
146 elif name in self.waiting:
apwb13af9e2006-09-25 22:05:42 +0000147 self.report("duplicate client")
mblighfadca202006-09-23 04:40:01 +0000148 client.settimeout(5)
149 client.send("!dup")
150 client.close()
151 return
152
153 # Acknowledge the client
154 client.send("wait")
155
156 except socket.timeout:
157 # This is nominally an error, but as we do not know
158 # who that was we cannot do anything sane other
159 # than report it and let the normal timeout kill
160 # us when thats appropriate.
apwb13af9e2006-09-25 22:05:42 +0000161 self.report("client handshake timeout: (%s:%d)" %\
162 (addr[0], addr[1]))
mblighfadca202006-09-23 04:40:01 +0000163 client.close()
164 return
165
apwb13af9e2006-09-25 22:05:42 +0000166 self.report("client now waiting: %s (%s:%d)" % \
167 (name, addr[0], addr[1]))
mblighfadca202006-09-23 04:40:01 +0000168
169 # They seem to be valid record them.
170 self.waiting[name] = connection
171 self.seen += 1
172
173 def master_release(self):
174 # Check everyone is still there, that they have not
175 # crashed or disconnected in the meantime.
176 allpresent = 1
177 for name in self.waiting:
178 (client, addr) = self.waiting[name]
179
apwb13af9e2006-09-25 22:05:42 +0000180 self.report("checking client present: " + name)
mblighfadca202006-09-23 04:40:01 +0000181
182 client.settimeout(5)
183 reply = 'none'
184 try:
185 client.send("ping")
186 reply = client.recv(1024)
187 except socket.timeout:
apwb13af9e2006-09-25 22:05:42 +0000188 self.report("ping/pong timeout: " + name)
mblighfadca202006-09-23 04:40:01 +0000189 pass
190
191 if reply != "pong":
192 allpresent = 0
193
194 if not allpresent:
195 raise BarrierError("master lost client")
196
197 # If every ones checks in then commit the release.
198 for name in self.waiting:
199 (client, addr) = self.waiting[name]
200
apwb13af9e2006-09-25 22:05:42 +0000201 self.report("releasing client: " + name)
mblighfadca202006-09-23 04:40:01 +0000202
203 client.settimeout(5)
204 try:
205 client.send("rlse")
apwb13af9e2006-09-25 22:05:42 +0000206 except socket.timeout:
207 self.report("release timeout: " + name)
mblighfadca202006-09-23 04:40:01 +0000208 pass
209
210 def master_close(self):
211 # Either way, close out all the clients. If we have
212 # not released them then they know to abort.
213 for name in self.waiting:
214 (client, addr) = self.waiting[name]
215
apwb13af9e2006-09-25 22:05:42 +0000216 self.report("closing client: " + name)
mblighfadca202006-09-23 04:40:01 +0000217
218 try:
219 client.close()
220 except:
221 pass
222
223 # And finally close out our server socket.
224 self.server.close()
225
226 def master(self):
apwb13af9e2006-09-25 22:05:42 +0000227 self.report("selected as master")
mblighfadca202006-09-23 04:40:01 +0000228
229 self.seen = 1
230 self.waiting = {}
231 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
232 self.server.setsockopt(socket.SOL_SOCKET,
233 socket.SO_REUSEADDR, 1)
234 self.server.bind(('', self.port))
235 self.server.listen(10)
236
237 failed = 0
238 try:
239 while 1:
240 try:
241 # Wait for callers welcoming each.
242 self.server.settimeout(self.remaining())
243 connection = self.server.accept()
244 self.master_welcome(connection)
245 except socket.timeout:
apw019aa942006-11-06 17:37:16 +0000246 self.report("timeout waiting for " +
247 "remaining clients")
mblighfadca202006-09-23 04:40:01 +0000248 pass
249
250 # Check if everyone is here.
apwb13af9e2006-09-25 22:05:42 +0000251 self.report("master seen %d of %d" % \
252 (self.seen, len(self.members)))
mblighfadca202006-09-23 04:40:01 +0000253 if self.seen == len(self.members):
254 self.master_release()
255 break
256
257 self.master_close()
258 except:
259 self.master_close()
260 raise
261
262 def slave(self):
263 # Clip out the master host in the barrier, remove any
264 # trailing local identifier following a #. This allows
265 # multiple members per host which is particularly helpful
266 # in testing.
267 master = (self.members[0].split('#'))[0]
268
apwb13af9e2006-09-25 22:05:42 +0000269 self.report("selected as slave, master=" + master)
mblighfadca202006-09-23 04:40:01 +0000270
271 # Connect to them.
272 remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
273 while self.remaining() > 0:
274 remote.settimeout(30)
275 try:
apwb13af9e2006-09-25 22:05:42 +0000276 self.report("calling master")
mblighfadca202006-09-23 04:40:01 +0000277 remote.connect((master, self.port))
278 break
279 except socket.timeout:
apwb13af9e2006-09-25 22:05:42 +0000280 self.report("timeout calling master, retry")
mblighfadca202006-09-23 04:40:01 +0000281 sleep(10)
282 pass
283 except socket.error, err:
284 (code, str) = err
285 if (code != errno.ECONNREFUSED):
286 raise
287 sleep(10)
288
289 remote.settimeout(self.remaining())
290 remote.send(self.tag + " " + self.hostid)
291
292 mode = "none"
293 while 1:
294 # All control messages are the same size to allow
295 # us to split individual messages easily.
296 remote.settimeout(self.remaining())
297 reply = remote.recv(4)
298 if not reply:
299 break
300
301 reply = reply.strip("\r\n")
apwb13af9e2006-09-25 22:05:42 +0000302 self.report("master said: " + reply)
mblighfadca202006-09-23 04:40:01 +0000303
304 mode = reply
305 if reply == "ping":
306 # Ensure we have sufficient time for the
307 # ping/pong/rlse cyle to complete normally.
308 self.update_timeout(10 * len(self.members))
309
apwb13af9e2006-09-25 22:05:42 +0000310 self.report("pong")
mblighfadca202006-09-23 04:40:01 +0000311 remote.settimeout(self.remaining())
mbligh7d8200d2006-09-23 05:25:10 +0000312 remote.send("pong")
mblighfadca202006-09-23 04:40:01 +0000313
314 elif reply == "rlse":
315 # Ensure we have sufficient time for the
316 # ping/pong/rlse cyle to complete normally.
317 self.update_timeout(10 * len(self.members))
318
apwb13af9e2006-09-25 22:05:42 +0000319 self.report("was released, waiting for close")
mblighfadca202006-09-23 04:40:01 +0000320
321 if mode == "rlse":
322 pass
323 elif mode == "wait":
324 raise BarrierError("master abort -- barrier timeout")
325 elif mode == "ping":
326 raise BarrierError("master abort -- client lost")
327 elif mode == "!tag":
328 raise BarrierError("master abort -- incorrect tag")
329 elif mode == "!dup":
330 raise BarrierError("master abort -- duplicate client")
331 else:
332 raise BarrierError("master handshake failure: " + mode)
333
334 def rendevous(self, *hosts):
335 self.members = list(hosts)
336 self.members.sort()
337
apwb13af9e2006-09-25 22:05:42 +0000338 self.report("members: " + ",".join(self.members))
mblighfadca202006-09-23 04:40:01 +0000339
340 # Figure out who is the master in this barrier.
341 if self.hostid == self.members[0]:
342 self.master()
343 else:
344 self.slave()
345
346
347#
348# TESTING -- direct test harness.
349#
350# For example, run in parallel:
351# python bin/barrier.py 1 meeting
352# python bin/barrier.py 2 meeting
353# python bin/barrier.py 3 meeting
354#
355if __name__ == "__main__":
mbligh7d8200d2006-09-23 05:25:10 +0000356 barrier = barrier('127.0.0.1#' + sys.argv[1], sys.argv[2], 60)
mblighfadca202006-09-23 04:40:01 +0000357
358 try:
mbligh7d8200d2006-09-23 05:25:10 +0000359 all = [ '127.0.0.1#2', '127.0.0.1#1', '127.0.0.1#3' ]
360 barrier.rendevous(*all)
mblighfadca202006-09-23 04:40:01 +0000361 except BarrierError, err:
apwb13af9e2006-09-25 22:05:42 +0000362 print "barrier: 127.0.0.1#" + sys.argv[1] + \
363 ": barrier failed:", err
mblighfadca202006-09-23 04:40:01 +0000364 sys.exit(1)
365 else:
apwb13af9e2006-09-25 22:05:42 +0000366 print "barrier: 127.0.0.1#" + sys.argv[1] + \
367 ": all present and accounted for"
mblighfadca202006-09-23 04:40:01 +0000368 sys.exit(0)