ceph: keep reserved replies on the request structure

This includes treating all the data preallocation and revokation
at the same place, not having to have a special case for
the reserved pages.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 44abe29..df21068 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -13,6 +13,8 @@
 #include "decode.h"
 #include "auth.h"
 
+#define OSD_REPLY_RESERVE_FRONT_LEN	512
+
 const static struct ceph_connection_operations osd_con_ops;
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
@@ -73,6 +75,16 @@
 	     req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
 }
 
+static void remove_replies(struct ceph_osd_request *req)
+{
+	int i;
+	int max = ARRAY_SIZE(req->replies);
+
+	for (i=0; i<max; i++) {
+		if (req->replies[i])
+			ceph_msg_put(req->replies[i]);
+	}
+}
 
 /*
  * requests
@@ -87,12 +99,13 @@
 		ceph_msg_put(req->r_request);
 	if (req->r_reply)
 		ceph_msg_put(req->r_reply);
-	if (req->r_con_filling_pages) {
+	remove_replies(req);
+	if (req->r_con_filling_msg) {
 		dout("release_request revoking pages %p from con %p\n",
-		     req->r_pages, req->r_con_filling_pages);
-		ceph_con_revoke_pages(req->r_con_filling_pages,
-				      req->r_pages);
-		ceph_con_put(req->r_con_filling_pages);
+		     req->r_pages, req->r_con_filling_msg);
+		ceph_con_revoke_message(req->r_con_filling_msg,
+				      req->r_reply);
+		ceph_con_put(req->r_con_filling_msg);
 	}
 	if (req->r_own_pages)
 		ceph_release_page_vector(req->r_pages,
@@ -104,6 +117,60 @@
 		kfree(req);
 }
 
+static int alloc_replies(struct ceph_osd_request *req, int num_reply)
+{
+	int i;
+	int max = ARRAY_SIZE(req->replies);
+
+	BUG_ON(num_reply > max);
+
+	for (i=0; i<num_reply; i++) {
+		req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
+		if (IS_ERR(req->replies[i])) {
+			int j;
+			int err = PTR_ERR(req->replies[i]);
+			for (j = 0; j<=i; j++) {
+				ceph_msg_put(req->replies[j]);
+			}
+			return err;
+		}
+	}
+
+	for (; i<max; i++) {
+		req->replies[i] = NULL;
+	}
+
+	req->cur_reply = 0;
+
+	return 0;
+}
+
+static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
+				       struct ceph_osd_request *req,
+				       int front_len)
+{
+	struct ceph_msg *reply;
+	if (req->r_con_filling_msg) {
+		dout("revoking reply msg %p from old con %p\n", req->r_reply,
+		     req->r_con_filling_msg);
+		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+		ceph_con_put(req->r_con_filling_msg);
+		req->cur_reply = 0;
+	}
+	reply = req->replies[req->cur_reply];
+	if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
+		/* maybe we can allocate it now? */
+		reply = ceph_msg_new(0, front_len, 0, 0, NULL);
+		if (!reply || IS_ERR(reply)) {
+			pr_err(" reply alloc failed, front_len=%d\n", front_len);
+			return ERR_PTR(-ENOMEM);
+		}
+	}
+	req->r_con_filling_msg = ceph_con_get(con);
+	req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
+	return ceph_msg_get(reply);
+}
+
 /*
  * build new request AND message, calculate layout, and adjust file
  * extent as needed.
@@ -147,7 +214,7 @@
 	if (req == NULL)
 		return ERR_PTR(-ENOMEM);
 
-	err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply);
+	err = alloc_replies(req, num_reply);
 	if (err) {
 		ceph_osdc_put_request(req);
 		return ERR_PTR(-ENOMEM);
@@ -173,7 +240,6 @@
 	else
 		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
 	if (IS_ERR(msg)) {
-		ceph_msgpool_resv(&osdc->msgpool_op_reply, -num_reply);
 		ceph_osdc_put_request(req);
 		return ERR_PTR(PTR_ERR(msg));
 	}
@@ -471,8 +537,6 @@
 	rb_erase(&req->r_node, &osdc->requests);
 	osdc->num_requests--;
 
-	ceph_msgpool_resv(&osdc->msgpool_op_reply, -req->r_num_prealloc_reply);
-
 	if (req->r_osd) {
 		/* make sure the original request isn't in flight. */
 		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
@@ -724,12 +788,12 @@
 	flags = le32_to_cpu(rhead->flags);
 
 	/*
-	 * if this connection filled our pages, drop our reference now, to
+	 * if this connection filled our message, drop our reference now, to
 	 * avoid a (safe but slower) revoke later.
 	 */
-	if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
-		dout(" got pages, dropping con_filling_pages ref %p\n", con);
-		req->r_con_filling_pages = NULL;
+	if (req->r_con_filling_msg == con && req->r_reply == msg) {
+		dout(" got pages, dropping con_filling_msg ref %p\n", con);
+		req->r_con_filling_msg = NULL;
 		ceph_con_put(con);
 	}
 
@@ -998,7 +1062,7 @@
  * find those pages.
  *  0 = success, -1 failure.
  */
-static int prepare_pages(struct ceph_connection *con,
+static int __prepare_pages(struct ceph_connection *con,
 			 struct ceph_msg_header *hdr,
 			 struct ceph_osd_request *req,
 			 u64 tid,
@@ -1017,20 +1081,10 @@
 
 	osdc = osd->o_osdc;
 
-	dout("prepare_pages on msg %p want %d\n", m, want);
-	dout("prepare_pages tid %llu has %d pages, want %d\n",
+	dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
 	     tid, req->r_num_pages, want);
 	if (unlikely(req->r_num_pages < want))
 		goto out;
-
-	if (req->r_con_filling_pages) {
-		dout("revoking pages %p from old con %p\n", req->r_pages,
-		     req->r_con_filling_pages);
-		ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
-		ceph_con_put(req->r_con_filling_pages);
-	}
-	req->r_con_filling_pages = ceph_con_get(con);
-	req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
 	m->pages = req->r_pages;
 	m->nr_pages = req->r_num_pages;
 	ret = 0; /* success */
@@ -1164,13 +1218,8 @@
 	err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
 	if (err < 0)
 		goto out_mempool;
-	err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false);
-	if (err < 0)
-		goto out_msgpool;
 	return 0;
 
-out_msgpool:
-	ceph_msgpool_destroy(&osdc->msgpool_op);
 out_mempool:
 	mempool_destroy(osdc->req_mempool);
 out:
@@ -1186,7 +1235,6 @@
 	}
 	mempool_destroy(osdc->req_mempool);
 	ceph_msgpool_destroy(&osdc->msgpool_op);
-	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 }
 
 /*
@@ -1323,17 +1371,17 @@
 	if (!req) {
 		*skip = 1;
 		m = NULL;
-		dout("prepare_pages unknown tid %llu\n", tid);
+		dout("alloc_msg unknown tid %llu\n", tid);
 		goto out;
 	}
-	m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
-	if (!m) {
+	m = __get_next_reply(con, req, front);
+	if (!m || IS_ERR(m)) {
 		*skip = 1;
 		goto out;
 	}
 
 	if (data_len > 0) {
-		err = prepare_pages(con, hdr, req, tid, m);
+		err = __prepare_pages(con, hdr, req, tid, m);
 		if (err < 0) {
 			*skip = 1;
 			ceph_msg_put(m);