ceph: use connection mutex to protect read and write stages

Use a single mutex (previously out_mutex) to protect both read and write
activity from concurrent ceph_con_* calls.  Drop the mutex when doing
callbacks to avoid nested locking (the callback may need to call something
like ceph_con_close).

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2e4e977..c03b418 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -316,7 +316,6 @@
 {
 	/* reset connection, out_queue, msg_ and connect_seq */
 	/* discard existing out_queue and msg_seq */
-	mutex_lock(&con->out_mutex);
 	ceph_msg_remove_list(&con->out_queue);
 	ceph_msg_remove_list(&con->out_sent);
 
@@ -332,7 +331,6 @@
 		con->out_msg = NULL;
 	}
 	con->in_seq = 0;
-	mutex_unlock(&con->out_mutex);
 }
 
 /*
@@ -343,7 +341,9 @@
 	dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
 	set_bit(CLOSED, &con->state);  /* in case there's queued work */
 	clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
+	mutex_lock(&con->mutex);
 	reset_connection(con);
+	mutex_unlock(&con->mutex);
 	queue_con(con);
 }
 
@@ -392,7 +392,7 @@
 	memset(con, 0, sizeof(*con));
 	atomic_set(&con->nref, 1);
 	con->msgr = msgr;
-	mutex_init(&con->out_mutex);
+	mutex_init(&con->mutex);
 	INIT_LIST_HEAD(&con->out_queue);
 	INIT_LIST_HEAD(&con->out_sent);
 	INIT_DELAYED_WORK(&con->work, con_work);
@@ -571,11 +571,13 @@
 	int auth_len = 0;
 	int auth_protocol = 0;
 
+	mutex_unlock(&con->mutex);
 	if (con->ops->get_authorizer)
 		con->ops->get_authorizer(con, &auth_buf, &auth_len,
 					 &auth_protocol, &con->auth_reply_buf,
 					 &con->auth_reply_buf_len,
 					 con->auth_retry);
+	mutex_lock(&con->mutex);
 
 	con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
 	con->out_connect.authorizer_len = cpu_to_le32(auth_len);
@@ -1094,10 +1096,13 @@
 		       le32_to_cpu(con->out_connect.protocol_version),
 		       le32_to_cpu(con->in_reply.protocol_version));
 		con->error_msg = "protocol version mismatch";
-		if (con->ops->bad_proto)
-			con->ops->bad_proto(con);
 		reset_connection(con);
 		set_bit(CLOSED, &con->state);  /* in case there's queued work */
+
+		mutex_unlock(&con->mutex);
+		if (con->ops->bad_proto)
+			con->ops->bad_proto(con);
+		mutex_lock(&con->mutex);
 		return -1;
 
 	case CEPH_MSGR_TAG_BADAUTHORIZER:
@@ -1133,9 +1138,11 @@
 		prepare_read_connect(con);
 
 		/* Tell ceph about it. */
+		mutex_unlock(&con->mutex);
 		pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
 		if (con->ops->peer_reset)
 			con->ops->peer_reset(con);
+		mutex_lock(&con->mutex);
 		break;
 
 	case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1221,7 +1228,6 @@
 	u64 ack = le64_to_cpu(con->in_temp_ack);
 	u64 seq;
 
-	mutex_lock(&con->out_mutex);
 	while (!list_empty(&con->out_sent)) {
 		m = list_first_entry(&con->out_sent, struct ceph_msg,
 				     list_head);
@@ -1232,7 +1238,6 @@
 		     le16_to_cpu(m->hdr.type), m);
 		ceph_msg_remove(m);
 	}
-	mutex_unlock(&con->out_mutex);
 	prepare_read_tag(con);
 }
 
@@ -1366,8 +1371,10 @@
 		/* find pages for data payload */
 		want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
 		ret = -1;
+		mutex_unlock(&con->mutex);
 		if (con->ops->prepare_pages)
 			ret = con->ops->prepare_pages(con, m, want);
+		mutex_lock(&con->mutex);
 		if (ret < 0) {
 			dout("%p prepare_pages failed, skipping payload\n", m);
 			con->in_base_pos = -data_len - sizeof(m->footer);
@@ -1454,9 +1461,8 @@
 	if (con->peer_name.type == 0)
 		con->peer_name = msg->hdr.src.name;
 
-	mutex_lock(&con->out_mutex);
 	con->in_seq++;
-	mutex_unlock(&con->out_mutex);
+	mutex_unlock(&con->mutex);
 
 	dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
 	     msg, le64_to_cpu(msg->hdr.seq),
@@ -1467,6 +1473,8 @@
 	     le32_to_cpu(msg->hdr.data_len),
 	     con->in_front_crc, con->in_middle_crc, con->in_data_crc);
 	con->ops->dispatch(con, msg);
+
+	mutex_lock(&con->mutex);
 	prepare_read_tag(con);
 }
 
@@ -1483,7 +1491,7 @@
 	dout("try_write start %p state %lu nref %d\n", con, con->state,
 	     atomic_read(&con->nref));
 
-	mutex_lock(&con->out_mutex);
+	mutex_lock(&con->mutex);
 more:
 	dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
@@ -1576,7 +1584,7 @@
 done:
 	ret = 0;
 out:
-	mutex_unlock(&con->out_mutex);
+	mutex_unlock(&con->mutex);
 	dout("try_write done on %p\n", con);
 	return ret;
 }
@@ -1600,6 +1608,8 @@
 	dout("try_read start on %p\n", con);
 	msgr = con->msgr;
 
+	mutex_lock(&con->mutex);
+
 more:
 	dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
 	     con->in_base_pos);
@@ -1693,6 +1703,7 @@
 done:
 	ret = 0;
 out:
+	mutex_unlock(&con->mutex);
 	dout("try_read done on %p\n", con);
 	return ret;
 
@@ -1818,6 +1829,8 @@
 
 	clear_bit(BUSY, &con->state);  /* to avoid an improbable race */
 
+	mutex_lock(&con->mutex);
+
 	con_close_socket(con);
 
 	if (con->in_msg) {
@@ -1827,24 +1840,24 @@
 
 	/* If there are no messages in the queue, place the connection
 	 * in a STANDBY state (i.e., don't try to reconnect just yet). */
-	mutex_lock(&con->out_mutex);
 	if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
 		dout("fault setting STANDBY\n");
 		set_bit(STANDBY, &con->state);
-		mutex_unlock(&con->out_mutex);
+		mutex_unlock(&con->mutex);
 		goto out;
 	}
 
 	/* Requeue anything that hasn't been acked, and retry after a
 	 * delay. */
 	list_splice_init(&con->out_sent, &con->out_queue);
-	mutex_unlock(&con->out_mutex);
 
 	if (con->delay == 0)
 		con->delay = BASE_DELAY_INTERVAL;
 	else if (con->delay < MAX_DELAY_INTERVAL)
 		con->delay *= 2;
 
+	mutex_unlock(&con->mutex);
+
 	/* explicitly schedule work to try to reconnect again later. */
 	dout("fault queueing %p delay %lu\n", con, con->delay);
 	con->ops->get(con);
@@ -1920,7 +1933,7 @@
 	msg->hdr.dst_erank = con->peer_addr.erank;
 
 	/* queue */
-	mutex_lock(&con->out_mutex);
+	mutex_lock(&con->mutex);
 	BUG_ON(!list_empty(&msg->list_head));
 	list_add_tail(&msg->list_head, &con->out_queue);
 	dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -1929,7 +1942,7 @@
 	     le32_to_cpu(msg->hdr.front_len),
 	     le32_to_cpu(msg->hdr.middle_len),
 	     le32_to_cpu(msg->hdr.data_len));
-	mutex_unlock(&con->out_mutex);
+	mutex_unlock(&con->mutex);
 
 	/* if there wasn't anything waiting to send before, queue
 	 * new work */
@@ -1942,7 +1955,7 @@
  */
 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
 {
-	mutex_lock(&con->out_mutex);
+	mutex_lock(&con->mutex);
 	if (!list_empty(&msg->list_head)) {
 		dout("con_revoke %p msg %p\n", con, msg);
 		list_del_init(&msg->list_head);
@@ -1959,7 +1972,7 @@
 	} else {
 		dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
 	}
-	mutex_unlock(&con->out_mutex);
+	mutex_unlock(&con->mutex);
 }
 
 /*