barrier cleanups:
* renames barrier.py to base_barrier.py and adds a barrier.py
stub to import from base_barrier and override with site_barrier
if found. barrier_unittest.py is renamed to match.
* Move BarrierAbortError to the error module with everything else.
* Add a rendezvous_servers abort=True from the server test case.
* Moved get_sync_control_file() from common_lib.utils to
server.base_utils where it belongs to avoid a circular
import of utils importing barrier.
Signed-off-by: Gregory Smith <gps@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@4444 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/client/common_lib/base_barrier.py b/client/common_lib/base_barrier.py
new file mode 100644
index 0000000..e4de635
--- /dev/null
+++ b/client/common_lib/base_barrier.py
@@ -0,0 +1,543 @@
+import sys, socket, errno, logging
+from time import time, sleep
+from autotest_lib.client.common_lib import error
+
+# default barrier port
+_DEFAULT_PORT = 11922
+
+
+class BarrierAbortError(error.BarrierError):
+ """Special BarrierError raised when an explicit abort is requested."""
+
+
+class listen_server(object):
+ """
+ Manages a listening socket for barrier.
+
+ Can be used to run multiple barrier instances with the same listening
+ socket (if they were going to listen on the same port).
+
+ Attributes:
+
+ @attr address: Address to bind to (string).
+ @attr port: Port to bind to.
+ @attr socket: Listening socket object.
+ """
+ def __init__(self, address='', port=_DEFAULT_PORT):
+ """
+ Create a listen_server instance for the given address/port.
+
+ @param address: The address to listen on.
+ @param port: The port to listen on.
+ """
+ self.address = address
+ self.port = port
+ self.socket = self._setup()
+
+
+ def _setup(self):
+ """Create, bind and listen on the listening socket."""
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((self.address, self.port))
+ sock.listen(10)
+
+ return sock
+
+
+ def close(self):
+ """Close the listening socket."""
+ self.socket.close()
+
+
+class barrier(object):
+ """Multi-machine barrier support.
+
+ Provides multi-machine barrier mechanism.
+ Execution stops until all members arrive at the barrier.
+
+ Implementation Details:
+ .......................
+
+ When a barrier is forming the master node (first in sort order) in the
+ set accepts connections from each member of the set. As they arrive
+ they indicate the barrier they are joining and their identifier (their
+ hostname or IP address and optional tag). They are then asked to wait.
+ When all members are present the master node then checks that each
+ member is still responding via a ping/pong exchange. If this is
+ successful then everyone has checked in at the barrier. We then tell
+ everyone they may continue via a rlse message.
+
+ Where the master is not the first to reach the barrier the client
+ connects will fail. Client will retry until they either succeed in
+ connecting to master or the overall timeout is exceeded.
+
+ As an example here is the exchange for a three node barrier called
+ 'TAG'
+
+ MASTER CLIENT1 CLIENT2
+ <-------------TAG C1-------------
+ --------------wait-------------->
+ [...]
+ <-------------TAG C2-----------------------------
+ --------------wait------------------------------>
+ [...]
+ --------------ping-------------->
+ <-------------pong---------------
+ --------------ping------------------------------>
+ <-------------pong-------------------------------
+ ----- BARRIER conditions MET -----
+ --------------rlse-------------->
+ --------------rlse------------------------------>
+
+ Note that once the last client has responded to pong the barrier is
+ implicitly deemed satisifed, they have all acknowledged their presence.
+ If we fail to send any of the rlse messages the barrier is still a
+ success, the failed host has effectively broken 'right at the beginning'
+ of the post barrier execution window.
+
+ In addition, there is another rendezvous, that makes each slave a server
+ and the master a client. The connection process and usage is still the
+ same but allows barriers from machines that only have a one-way
+ connection initiation. This is called rendezvous_servers.
+
+ For example:
+ if ME == SERVER:
+ server start
+
+ b = job.barrier(ME, 'server-up', 120)
+ b.rendezvous(CLIENT, SERVER)
+
+ if ME == CLIENT:
+ client run
+
+ b = job.barrier(ME, 'test-complete', 3600)
+ b.rendezvous(CLIENT, SERVER)
+
+ if ME == SERVER:
+ server stop
+
+ Any client can also request an abort of the job by setting
+ abort=True in the rendezvous arguments.
+ """
+
+ def __init__(self, hostid, tag, timeout=None, port=None,
+ listen_server=None):
+ """
+ @param hostid: My hostname/IP address + optional tag.
+ @param tag: Symbolic name of the barrier in progress.
+ @param timeout: Maximum seconds to wait for a the barrier to meet.
+ @param port: Port number to listen on.
+ @param listen_server: External listen_server instance to use instead
+ of creating our own. Create a listen_server instance and
+ reuse it across multiple barrier instances so that the
+ barrier code doesn't try to quickly re-bind on the same port
+ (packets still in transit for the previous barrier they may
+ reset new connections).
+ """
+ self._hostid = hostid
+ self._tag = tag
+ if listen_server:
+ if port:
+ raise error.BarrierError(
+ '"port" and "listen_server" are mutually exclusive.')
+ self._port = listen_server.port
+ else:
+ self._port = port or _DEFAULT_PORT
+ self._server = listen_server # A listen_server instance or None.
+ self._members = [] # List of hosts we expect to find at the barrier.
+ self._timeout_secs = timeout
+ self._start_time = None # Timestamp of when we started waiting.
+ self._masterid = None # Host/IP + optional tag of selected master.
+ logging.info("tag=%s port=%d timeout=%r",
+ self._tag, self._port, self._timeout_secs)
+
+ # Number of clients seen (should be the length of self._waiting).
+ self._seen = 0
+
+ # Clients who have checked in and are waiting (if we are a master).
+ self._waiting = {} # Maps from hostname -> (client, addr) tuples.
+
+
+ def _get_host_from_id(self, hostid):
+ # Remove any trailing local identifier following a #.
+ # This allows multiple members per host which is particularly
+ # helpful in testing.
+ if not hostid.startswith('#'):
+ return hostid.split('#')[0]
+ else:
+ raise error.BarrierError(
+ "Invalid Host id: Host Address should be specified")
+
+
+ def _update_timeout(self, timeout):
+ if timeout is not None and self._start_time is not None:
+ self._timeout_secs = (time() - self._start_time) + timeout
+ else:
+ self._timeout_secs = timeout
+
+
+ def _remaining(self):
+ if self._timeout_secs is not None and self._start_time is not None:
+ timeout = self._timeout_secs - (time() - self._start_time)
+ if timeout <= 0:
+ errmsg = "timeout waiting for barrier: %s" % self._tag
+ logging.error(error)
+ raise error.BarrierError(errmsg)
+ else:
+ timeout = self._timeout_secs
+
+ if self._timeout_secs is not None:
+ logging.info("seconds remaining: %d", timeout)
+ return timeout
+
+
+ def _master_welcome(self, connection):
+ client, addr = connection
+ name = None
+
+ client.settimeout(5)
+ try:
+ # Get the clients name.
+ intro = client.recv(1024)
+ intro = intro.strip("\r\n")
+
+ intro_parts = intro.split(' ', 2)
+ if len(intro_parts) != 2:
+ logging.warn("Ignoring invalid data from %s: %r",
+ client.getpeername(), intro)
+ client.close()
+ return
+ tag, name = intro_parts
+
+ logging.info("new client tag=%s, name=%s", tag, name)
+
+ # Ok, we know who is trying to attach. Confirm that
+ # they are coming to the same meeting. Also, everyone
+ # should be using a unique handle (their IP address).
+ # If we see a duplicate, something _bad_ has happened
+ # so drop them now.
+ if self._tag != tag:
+ logging.warn("client arriving for the wrong barrier: %s != %s",
+ self._tag, tag)
+ client.settimeout(5)
+ client.send("!tag")
+ client.close()
+ return
+ elif name in self._waiting:
+ logging.warn("duplicate client")
+ client.settimeout(5)
+ client.send("!dup")
+ client.close()
+ return
+
+ # Acknowledge the client
+ client.send("wait")
+
+ except socket.timeout:
+ # This is nominally an error, but as we do not know
+ # who that was we cannot do anything sane other
+ # than report it and let the normal timeout kill
+ # us when thats appropriate.
+ logging.warn("client handshake timeout: (%s:%d)",
+ addr[0], addr[1])
+ client.close()
+ return
+
+ logging.info("client now waiting: %s (%s:%d)",
+ name, addr[0], addr[1])
+
+ # They seem to be valid record them.
+ self._waiting[name] = connection
+ self._seen += 1
+
+
+ def _slave_hello(self, connection):
+ (client, addr) = connection
+ name = None
+
+ client.settimeout(5)
+ try:
+ client.send(self._tag + " " + self._hostid)
+
+ reply = client.recv(4)
+ reply = reply.strip("\r\n")
+ logging.info("master said: %s", reply)
+
+ # Confirm the master accepted the connection.
+ if reply != "wait":
+ logging.warn("Bad connection request to master")
+ client.close()
+ return
+
+ except socket.timeout:
+ # This is nominally an error, but as we do not know
+ # who that was we cannot do anything sane other
+ # than report it and let the normal timeout kill
+ # us when thats appropriate.
+ logging.error("master handshake timeout: (%s:%d)",
+ addr[0], addr[1])
+ client.close()
+ return
+
+ logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
+
+ # They seem to be valid record them.
+ self._waiting[self._hostid] = connection
+ self._seen = 1
+
+
+ def _master_release(self):
+ # Check everyone is still there, that they have not
+ # crashed or disconnected in the meantime.
+ allpresent = True
+ abort = self._abort
+ for name in self._waiting:
+ (client, addr) = self._waiting[name]
+
+ logging.info("checking client present: %s", name)
+
+ client.settimeout(5)
+ reply = 'none'
+ try:
+ client.send("ping")
+ reply = client.recv(1024)
+ except socket.timeout:
+ logging.warn("ping/pong timeout: %s", name)
+ pass
+
+ if reply == 'abrt':
+ logging.warn("Client %s requested abort", name)
+ abort = True
+ elif reply != "pong":
+ allpresent = False
+
+ if not allpresent:
+ raise error.BarrierError("master lost client")
+
+ if abort:
+ logging.info("Aborting the clients")
+ msg = 'abrt'
+ else:
+ logging.info("Releasing clients")
+ msg = 'rlse'
+
+ # If every ones checks in then commit the release.
+ for name in self._waiting:
+ (client, addr) = self._waiting[name]
+
+ client.settimeout(5)
+ try:
+ client.send(msg)
+ except socket.timeout:
+ logging.warn("release timeout: %s", name)
+ pass
+
+ if abort:
+ raise BarrierAbortError("Client requested abort")
+
+
+ def _waiting_close(self):
+ # Either way, close out all the clients. If we have
+ # not released them then they know to abort.
+ for name in self._waiting:
+ (client, addr) = self._waiting[name]
+
+ logging.info("closing client: %s", name)
+
+ try:
+ client.close()
+ except:
+ pass
+
+
+ def _run_server(self, is_master):
+ server = self._server or listen_server(port=self._port)
+ failed = 0
+ try:
+ while True:
+ try:
+ # Wait for callers welcoming each.
+ server.socket.settimeout(self._remaining())
+ connection = server.socket.accept()
+ if is_master:
+ self._master_welcome(connection)
+ else:
+ self._slave_hello(connection)
+ except socket.timeout:
+ logging.warn("timeout waiting for remaining clients")
+ pass
+
+ if is_master:
+ # Check if everyone is here.
+ logging.info("master seen %d of %d",
+ self._seen, len(self._members))
+ if self._seen == len(self._members):
+ self._master_release()
+ break
+ else:
+ # Check if master connected.
+ if self._seen:
+ logging.info("slave connected to master")
+ self._slave_wait()
+ break
+ finally:
+ self._waiting_close()
+ # if we created the listening_server in the beginning of this
+ # function then close the listening socket here
+ if not self._server:
+ server.close()
+
+
+ def _run_client(self, is_master):
+ while self._remaining() is None or self._remaining() > 0:
+ try:
+ remote = socket.socket(socket.AF_INET,
+ socket.SOCK_STREAM)
+ remote.settimeout(30)
+ if is_master:
+ # Connect to all slaves.
+ host = self._get_host_from_id(self._members[self._seen])
+ logging.info("calling slave: %s", host)
+ connection = (remote, (host, self._port))
+ remote.connect(connection[1])
+ self._master_welcome(connection)
+ else:
+ # Just connect to the master.
+ host = self._get_host_from_id(self._masterid)
+ logging.info("calling master")
+ connection = (remote, (host, self._port))
+ remote.connect(connection[1])
+ self._slave_hello(connection)
+ except socket.timeout:
+ logging.warn("timeout calling host, retry")
+ sleep(10)
+ pass
+ except socket.error, err:
+ (code, str) = err
+ if (code != errno.ECONNREFUSED):
+ raise
+ sleep(10)
+
+ if is_master:
+ # Check if everyone is here.
+ logging.info("master seen %d of %d",
+ self._seen, len(self._members))
+ if self._seen == len(self._members):
+ self._master_release()
+ break
+ else:
+ # Check if master connected.
+ if self._seen:
+ logging.info("slave connected to master")
+ self._slave_wait()
+ break
+
+ self._waiting_close()
+
+
+ def _slave_wait(self):
+ remote = self._waiting[self._hostid][0]
+ mode = "wait"
+ while True:
+ # All control messages are the same size to allow
+ # us to split individual messages easily.
+ remote.settimeout(self._remaining())
+ reply = remote.recv(4)
+ if not reply:
+ break
+
+ reply = reply.strip("\r\n")
+ logging.info("master said: %s", reply)
+
+ mode = reply
+ if reply == "ping":
+ # Ensure we have sufficient time for the
+ # ping/pong/rlse cyle to complete normally.
+ self._update_timeout(10 + 10 * len(self._members))
+
+ if self._abort:
+ msg = "abrt"
+ else:
+ msg = "pong"
+ logging.info(msg)
+ remote.settimeout(self._remaining())
+ remote.send(msg)
+
+ elif reply == "rlse" or reply == "abrt":
+ # Ensure we have sufficient time for the
+ # ping/pong/rlse cyle to complete normally.
+ self._update_timeout(10 + 10 * len(self._members))
+
+ logging.info("was released, waiting for close")
+
+ if mode == "rlse":
+ pass
+ elif mode == "wait":
+ raise error.BarrierError("master abort -- barrier timeout")
+ elif mode == "ping":
+ raise error.BarrierError("master abort -- client lost")
+ elif mode == "!tag":
+ raise error.BarrierError("master abort -- incorrect tag")
+ elif mode == "!dup":
+ raise error.BarrierError("master abort -- duplicate client")
+ elif mode == "abrt":
+ raise BarrierAbortError("Client requested abort")
+ else:
+ raise error.BarrierError("master handshake failure: " + mode)
+
+
+ def rendezvous(self, *hosts, **dargs):
+ # if called with abort=True, this will raise an exception
+ # on all the clients.
+ self._start_time = time()
+ self._members = list(hosts)
+ self._members.sort()
+ self._masterid = self._members.pop(0)
+ self._abort = dargs.get('abort', False)
+
+ logging.info("masterid: %s", self._masterid)
+ if self._abort:
+ logging.debug("%s is aborting", self._hostid)
+ if not len(self._members):
+ logging.info("No other members listed.")
+ return
+ logging.info("members: %s", ",".join(self._members))
+
+ self._seen = 0
+ self._waiting = {}
+
+ # Figure out who is the master in this barrier.
+ if self._hostid == self._masterid:
+ logging.info("selected as master")
+ self._run_server(is_master=True)
+ else:
+ logging.info("selected as slave")
+ self._run_client(is_master=False)
+
+
+ def rendezvous_servers(self, masterid, *hosts, **dargs):
+ # if called with abort=True, this will raise an exception
+ # on all the clients.
+ self._start_time = time()
+ self._members = list(hosts)
+ self._members.sort()
+ self._masterid = masterid
+ self._abort = dargs.get('abort', False)
+
+ logging.info("masterid: %s", self._masterid)
+ if not len(self._members):
+ logging.info("No other members listed.")
+ return
+ logging.info("members: %s", ",".join(self._members))
+
+ self._seen = 0
+ self._waiting = {}
+
+ # Figure out who is the master in this barrier.
+ if self._hostid == self._masterid:
+ logging.info("selected as master")
+ self._run_client(is_master=True)
+ else:
+ logging.info("selected as slave")
+ self._run_server(is_master=False)