ceph: refactor messages data section allocation

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 25de15c0..e8742cc 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -1315,7 +1315,7 @@
 	struct ceph_msg *m = con->in_msg;
 	void *p;
 	int ret;
-	int to, want, left;
+	int to, left;
 	unsigned front_len, middle_len, data_len, data_off;
 	int datacrc = con->msgr->nocrc;
 	int skip;
@@ -1351,6 +1351,7 @@
 	data_len = le32_to_cpu(con->in_hdr.data_len);
 	if (data_len > CEPH_MSG_MAX_DATA_LEN)
 		return -EIO;
+	data_off = le16_to_cpu(con->in_hdr.data_off);
 
 	/* allocate message? */
 	if (!con->in_msg) {
@@ -1375,7 +1376,10 @@
 		m->front.iov_len = 0;    /* haven't read it yet */
 		if (m->middle)
 			m->middle->vec.iov_len = 0;
-		memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
+
+		con->in_msg_pos.page = 0;
+		con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+		con->in_msg_pos.data_pos = 0;
 	}
 
 	/* front */
@@ -1393,31 +1397,6 @@
 	}
 
 	/* (page) data */
-	data_off = le16_to_cpu(m->hdr.data_off);
-	if (data_len == 0)
-		goto no_data;
-
-	if (m->nr_pages == 0) {
-		con->in_msg_pos.page = 0;
-		con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
-		con->in_msg_pos.data_pos = 0;
-		/* find pages for data payload */
-		want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
-		ret = -1;
-		mutex_unlock(&con->mutex);
-		if (con->ops->prepare_pages)
-			ret = con->ops->prepare_pages(con, m, want);
-		mutex_lock(&con->mutex);
-		if (ret < 0) {
-			dout("%p prepare_pages failed, skipping payload\n", m);
-			con->in_base_pos = -data_len - sizeof(m->footer);
-			ceph_msg_put(con->in_msg);
-			con->in_msg = NULL;
-			con->in_tag = CEPH_MSGR_TAG_READY;
-			return 0;
-		}
-		BUG_ON(m->nr_pages < want);
-	}
 	while (con->in_msg_pos.data_pos < data_len) {
 		left = min((int)(data_len - con->in_msg_pos.data_pos),
 			   (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
@@ -1440,7 +1419,6 @@
 		}
 	}
 
-no_data:
 	/* footer */
 	to = sizeof(m->hdr) + sizeof(m->footer);
 	while (con->in_base_pos < to) {
@@ -2136,6 +2114,25 @@
 	return 0;
 }
 
+static int ceph_alloc_data_section(struct ceph_connection *con, struct ceph_msg *msg)
+{
+	int ret;
+	int want;
+	int data_len = le32_to_cpu(msg->hdr.data_len);
+	unsigned data_off = le16_to_cpu(msg->hdr.data_off);
+
+	want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
+	ret = -1;
+	mutex_unlock(&con->mutex);
+	if (con->ops->prepare_pages)
+		ret = con->ops->prepare_pages(con, msg, want);
+	mutex_lock(&con->mutex);
+
+	BUG_ON(msg->nr_pages < want);
+
+	return ret;
+}
+
 /*
  * Generic message allocator, for incoming messages.
  */
@@ -2146,6 +2143,7 @@
 	int type = le16_to_cpu(hdr->type);
 	int front_len = le32_to_cpu(hdr->front_len);
 	int middle_len = le32_to_cpu(hdr->middle_len);
+	int data_len = le32_to_cpu(hdr->data_len);
 	struct ceph_msg *msg = NULL;
 	int ret;
 
@@ -2166,6 +2164,7 @@
 			return ERR_PTR(-ENOMEM);
 		}
 	}
+	memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
 	if (middle_len) {
 		ret = ceph_alloc_middle(con, msg);
@@ -2175,6 +2174,18 @@
 			return msg;
 		}
 	}
+
+	if (data_len) {
+		ret = ceph_alloc_data_section(con, msg);
+
+		if (ret < 0) {
+			*skip = 1;
+			ceph_msg_put(msg);
+			return NULL;
+		}
+	}
+
+
 	return msg;
 }