libceph: handle connection reopen race with callbacks

If a connection is closed and/or reopened (ceph_con_close, ceph_con_open)
it can race with a callback.  con_work does various state checks for
closed or reopened sockets at the beginning, but drops con->mutex before
making callbacks.  We need to check for state bit changes after retaking
the lock to ensure we restart con_work and execute those CLOSED/OPENING
tests or else we may end up operating under stale assumptions.

In Jim's case, this was causing 'bad tag' errors.

There are four cases where we re-take the con->mutex inside con_work: catch
them all and return EAGAIN from try_{read,write} so that we can restart
con_work.

Reported-by: Jim Schutt <jaschut@sandia.gov>
Tested-by: Jim Schutt <jaschut@sandia.gov>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index e15a82c..b140dd3 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -598,7 +598,7 @@
  * Connection negotiation.
  */
 
-static void prepare_connect_authorizer(struct ceph_connection *con)
+static int prepare_connect_authorizer(struct ceph_connection *con)
 {
 	void *auth_buf;
 	int auth_len = 0;
@@ -612,6 +612,10 @@
 					 con->auth_retry);
 	mutex_lock(&con->mutex);
 
+	if (test_bit(CLOSED, &con->state) ||
+	    test_bit(OPENING, &con->state))
+		return -EAGAIN;
+
 	con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
 	con->out_connect.authorizer_len = cpu_to_le32(auth_len);
 
@@ -619,6 +623,8 @@
 	con->out_kvec[con->out_kvec_left].iov_len = auth_len;
 	con->out_kvec_left++;
 	con->out_kvec_bytes += auth_len;
+
+	return 0;
 }
 
 /*
@@ -640,9 +646,9 @@
 	set_bit(WRITE_PENDING, &con->state);
 }
 
-static void prepare_write_connect(struct ceph_messenger *msgr,
-				  struct ceph_connection *con,
-				  int after_banner)
+static int prepare_write_connect(struct ceph_messenger *msgr,
+				 struct ceph_connection *con,
+				 int after_banner)
 {
 	unsigned global_seq = get_global_seq(con->msgr, 0);
 	int proto;
@@ -683,7 +689,7 @@
 	con->out_more = 0;
 	set_bit(WRITE_PENDING, &con->state);
 
-	prepare_connect_authorizer(con);
+	return prepare_connect_authorizer(con);
 }
 
 
@@ -1216,6 +1222,7 @@
 	u64 sup_feat = con->msgr->supported_features;
 	u64 req_feat = con->msgr->required_features;
 	u64 server_feat = le64_to_cpu(con->in_reply.features);
+	int ret;
 
 	dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
 
@@ -1250,7 +1257,9 @@
 			return -1;
 		}
 		con->auth_retry = 1;
-		prepare_write_connect(con->msgr, con, 0);
+		ret = prepare_write_connect(con->msgr, con, 0);
+		if (ret < 0)
+			return ret;
 		prepare_read_connect(con);
 		break;
 
@@ -1277,6 +1286,9 @@
 		if (con->ops->peer_reset)
 			con->ops->peer_reset(con);
 		mutex_lock(&con->mutex);
+		if (test_bit(CLOSED, &con->state) ||
+		    test_bit(OPENING, &con->state))
+			return -EAGAIN;
 		break;
 
 	case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1810,6 +1822,17 @@
 more:
 	dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
 	     con->in_base_pos);
+
+	/*
+	 * process_connect and process_message drop and re-take
+	 * con->mutex.  make sure we handle a racing close or reopen.
+	 */
+	if (test_bit(CLOSED, &con->state) ||
+	    test_bit(OPENING, &con->state)) {
+		ret = -EAGAIN;
+		goto out;
+	}
+
 	if (test_bit(CONNECTING, &con->state)) {
 		if (!test_bit(NEGOTIATING, &con->state)) {
 			dout("try_read connecting\n");
@@ -1938,8 +1961,10 @@
 {
 	struct ceph_connection *con = container_of(work, struct ceph_connection,
 						   work.work);
+	int ret;
 
 	mutex_lock(&con->mutex);
+restart:
 	if (test_and_clear_bit(BACKOFF, &con->state)) {
 		dout("con_work %p backing off\n", con);
 		if (queue_delayed_work(ceph_msgr_wq, &con->work,
@@ -1969,18 +1994,31 @@
 		con_close_socket(con);
 	}
 
-	if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
-	    try_read(con) < 0 ||
-	    try_write(con) < 0) {
-		mutex_unlock(&con->mutex);
-		ceph_fault(con);     /* error/fault path */
-		goto done_unlocked;
-	}
+	if (test_and_clear_bit(SOCK_CLOSED, &con->state))
+		goto fault;
+
+	ret = try_read(con);
+	if (ret == -EAGAIN)
+		goto restart;
+	if (ret < 0)
+		goto fault;
+
+	ret = try_write(con);
+	if (ret == -EAGAIN)
+		goto restart;
+	if (ret < 0)
+		goto fault;
 
 done:
 	mutex_unlock(&con->mutex);
 done_unlocked:
 	con->ops->put(con);
+	return;
+
+fault:
+	mutex_unlock(&con->mutex);
+	ceph_fault(con);     /* error/fault path */
+	goto done_unlocked;
 }