ceph: allocate middle of message before stating to read

Both front and middle parts of the message are now being
allocated at the ceph_alloc_msg().

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 1360708..25de15c0 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -1279,8 +1279,34 @@
 
 
 
+static int read_partial_message_section(struct ceph_connection *con,
+					struct kvec *section, unsigned int sec_len,
+					u32 *crc)
+{
+	int left;
+	int ret;
 
+	BUG_ON(!section);
 
+	while (section->iov_len < sec_len) {
+		BUG_ON(section->iov_base == NULL);
+		left = sec_len - section->iov_len;
+		ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
+				       section->iov_len, left);
+		if (ret <= 0)
+			return ret;
+		section->iov_len += ret;
+		if (section->iov_len == sec_len)
+			*crc = crc32c(0, section->iov_base,
+				      section->iov_len);
+	}
+
+	return 1;
+}
+
+static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+				struct ceph_msg_header *hdr,
+				int *skip);
 /*
  * read (part of) a message.
  */
@@ -1292,6 +1318,7 @@
 	int to, want, left;
 	unsigned front_len, middle_len, data_len, data_off;
 	int datacrc = con->msgr->nocrc;
+	int skip;
 
 	dout("read_partial_message con %p msg %p\n", con, m);
 
@@ -1315,7 +1342,6 @@
 			}
 		}
 	}
-
 	front_len = le32_to_cpu(con->in_hdr.front_len);
 	if (front_len > CEPH_MSG_MAX_FRONT_LEN)
 		return -EIO;
@@ -1330,8 +1356,8 @@
 	if (!con->in_msg) {
 		dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
 		     con->in_hdr.front_len, con->in_hdr.data_len);
-		con->in_msg = con->ops->alloc_msg(con, &con->in_hdr);
-		if (!con->in_msg) {
+		con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
+		if (skip) {
 			/* skip this message */
 			pr_err("alloc_msg returned NULL, skipping message\n");
 			con->in_base_pos = -front_len - middle_len - data_len -
@@ -1342,56 +1368,28 @@
 		if (IS_ERR(con->in_msg)) {
 			ret = PTR_ERR(con->in_msg);
 			con->in_msg = NULL;
-			con->error_msg = "out of memory for incoming message";
+			con->error_msg = "error allocating memory for incoming message";
 			return ret;
 		}
 		m = con->in_msg;
 		m->front.iov_len = 0;    /* haven't read it yet */
+		if (m->middle)
+			m->middle->vec.iov_len = 0;
 		memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
 	}
 
 	/* front */
-	while (m->front.iov_len < front_len) {
-		BUG_ON(m->front.iov_base == NULL);
-		left = front_len - m->front.iov_len;
-		ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
-				       m->front.iov_len, left);
-		if (ret <= 0)
-			return ret;
-		m->front.iov_len += ret;
-		if (m->front.iov_len == front_len)
-			con->in_front_crc = crc32c(0, m->front.iov_base,
-						      m->front.iov_len);
-	}
+	ret = read_partial_message_section(con, &m->front, front_len,
+					   &con->in_front_crc);
+	if (ret <= 0)
+		return ret;
 
 	/* middle */
-	while (middle_len > 0 && (!m->middle ||
-				  m->middle->vec.iov_len < middle_len)) {
-		if (m->middle == NULL) {
-			ret = -EOPNOTSUPP;
-			if (con->ops->alloc_middle)
-				ret = con->ops->alloc_middle(con, m);
-			if (ret < 0) {
-				pr_err("alloc_middle fail skipping payload\n");
-				con->in_base_pos = -middle_len - data_len
-					- sizeof(m->footer);
-				ceph_msg_put(con->in_msg);
-				con->in_msg = NULL;
-				con->in_tag = CEPH_MSGR_TAG_READY;
-				return 0;
-			}
-			m->middle->vec.iov_len = 0;
-		}
-		left = middle_len - m->middle->vec.iov_len;
-		ret = ceph_tcp_recvmsg(con->sock,
-				       (char *)m->middle->vec.iov_base +
-				       m->middle->vec.iov_len, left);
+	if (m->middle) {
+		ret = read_partial_message_section(con, &m->middle->vec, middle_len,
+						   &con->in_middle_crc);
 		if (ret <= 0)
 			return ret;
-		m->middle->vec.iov_len += ret;
-		if (m->middle->vec.iov_len == middle_len)
-			con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
-						      m->middle->vec.iov_len);
 	}
 
 	/* (page) data */
@@ -2116,31 +2114,13 @@
 }
 
 /*
- * Generic message allocator, for incoming messages.
- */
-struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
-				struct ceph_msg_header *hdr)
-{
-	int type = le16_to_cpu(hdr->type);
-	int front_len = le32_to_cpu(hdr->front_len);
-	struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
-
-	if (!msg) {
-		pr_err("unable to allocate msg type %d len %d\n",
-		       type, front_len);
-		return ERR_PTR(-ENOMEM);
-	}
-	return msg;
-}
-
-/*
  * Allocate "middle" portion of a message, if it is needed and wasn't
  * allocated by alloc_msg.  This allows us to read a small fixed-size
  * per-type header in the front and then gracefully fail (i.e.,
  * propagate the error to the caller based on info in the front) when
  * the middle is too large.
  */
-int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
+static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
 {
 	int type = le16_to_cpu(msg->hdr.type);
 	int middle_len = le32_to_cpu(msg->hdr.middle_len);
@@ -2156,6 +2136,48 @@
 	return 0;
 }
 
+/*
+ * Generic message allocator, for incoming messages.
+ */
+static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+				struct ceph_msg_header *hdr,
+				int *skip)
+{
+	int type = le16_to_cpu(hdr->type);
+	int front_len = le32_to_cpu(hdr->front_len);
+	int middle_len = le32_to_cpu(hdr->middle_len);
+	struct ceph_msg *msg = NULL;
+	int ret;
+
+	if (con->ops->alloc_msg) {
+		msg = con->ops->alloc_msg(con, hdr, skip);
+		if (IS_ERR(msg))
+			return msg;
+
+		if (*skip)
+			return NULL;
+	}
+	if (!msg) {
+		*skip = 0;
+		msg = ceph_msg_new(type, front_len, 0, 0, NULL);
+		if (!msg) {
+			pr_err("unable to allocate msg type %d len %d\n",
+			       type, front_len);
+			return ERR_PTR(-ENOMEM);
+		}
+	}
+
+	if (middle_len) {
+		ret = ceph_alloc_middle(con, msg);
+
+		if (ret < 0) {
+			ceph_msg_put(msg);
+			return msg;
+		}
+	}
+	return msg;
+}
+
 
 /*
  * Free a generically kmalloc'd message.