libceph: drop mutex while allocating a message

In ceph_con_in_msg_alloc(), if no alloc_msg method is defined for a
connection a new message is allocated with ceph_msg_new().

Drop the mutex before making this call, and make sure we're still
connected when we get it back again.

This is preparing for the next patch, which ensures all connections
define an alloc_msg method, and then handles them all the same way.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Greg Farnum <greg@inktank.com>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 0f9933a..6ec6051 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2807,13 +2807,12 @@
 	int type = le16_to_cpu(hdr->type);
 	int front_len = le32_to_cpu(hdr->front_len);
 	int middle_len = le32_to_cpu(hdr->middle_len);
+	struct ceph_msg *msg;
 	int ret = 0;
 
 	BUG_ON(con->in_msg != NULL);
 
 	if (con->ops->alloc_msg) {
-		struct ceph_msg *msg;
-
 		mutex_unlock(&con->mutex);
 		msg = con->ops->alloc_msg(con, hdr, skip);
 		mutex_lock(&con->mutex);
@@ -2838,12 +2837,19 @@
 		}
 	}
 	if (!con->in_msg) {
-		con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
-		if (!con->in_msg) {
+		mutex_unlock(&con->mutex);
+		msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
+		mutex_lock(&con->mutex);
+		if (!msg) {
 			pr_err("unable to allocate msg type %d len %d\n",
 			       type, front_len);
 			return -ENOMEM;
 		}
+		if (con->state != CON_STATE_OPEN) {
+			ceph_msg_put(msg);
+			return -EAGAIN;
+		}
+		con->in_msg = msg;
 		con->in_msg->con = con->ops->get(con);
 		BUG_ON(con->in_msg->con == NULL);
 		con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);