ceph: use a shared zero page rather than one per messenger

Each messenger allocates a page to be used when writing zeroes
out in the event of error or other abnormal condition.  Instead,
use the kernel ZERO_PAGE() for that purpose.

Signed-off-by: Alex Elder <elder@dreamhost.com>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d11f91b..7383562 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -52,6 +52,9 @@
 static DEFINE_SPINLOCK(addr_str_lock);
 static int last_addr_str;
 
+static struct page *zero_page;		/* used in certain error cases */
+static void *zero_page_address;		/* kernel virtual addr of zero_page */
+
 const char *ceph_pr_addr(const struct sockaddr_storage *ss)
 {
 	int i;
@@ -99,18 +102,41 @@
 
 int ceph_msgr_init(void)
 {
+	BUG_ON(zero_page != NULL);
+	zero_page = ZERO_PAGE(0);
+	page_cache_get(zero_page);
+
+	BUG_ON(zero_page_address != NULL);
+	zero_page_address = kmap(zero_page);
+
 	ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
 	if (!ceph_msgr_wq) {
 		pr_err("msgr_init failed to create workqueue\n");
+
+		zero_page_address = NULL;
+		kunmap(zero_page);
+		page_cache_release(zero_page);
+		zero_page = NULL;
+
 		return -ENOMEM;
 	}
+
 	return 0;
 }
 EXPORT_SYMBOL(ceph_msgr_init);
 
 void ceph_msgr_exit(void)
 {
+	BUG_ON(ceph_msgr_wq == NULL);
 	destroy_workqueue(ceph_msgr_wq);
+
+	BUG_ON(zero_page_address == NULL);
+	zero_page_address = NULL;
+
+	BUG_ON(zero_page == NULL);
+	kunmap(zero_page);
+	page_cache_release(zero_page);
+	zero_page = NULL;
 }
 EXPORT_SYMBOL(ceph_msgr_exit);
 
@@ -841,9 +867,9 @@
 			max_write = bv->bv_len;
 #endif
 		} else {
-			page = con->msgr->zero_page;
+			page = zero_page;
 			if (crc)
-				kaddr = page_address(con->msgr->zero_page);
+				kaddr = zero_page_address;
 		}
 		len = min_t(int, max_write - con->out_msg_pos.page_pos,
 			    total_max_write);
@@ -914,7 +940,7 @@
 
 	while (con->out_skip > 0) {
 		struct kvec iov = {
-			.iov_base = page_address(con->msgr->zero_page),
+			.iov_base = zero_page_address,
 			.iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE)
 		};
 
@@ -2222,15 +2248,6 @@
 
 	spin_lock_init(&msgr->global_seq_lock);
 
-	/* the zero page is needed if a request is "canceled" while the message
-	 * is being written over the socket */
-	msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO);
-	if (!msgr->zero_page) {
-		kfree(msgr);
-		return ERR_PTR(-ENOMEM);
-	}
-	kmap(msgr->zero_page);
-
 	if (myaddr)
 		msgr->inst.addr = *myaddr;
 
@@ -2247,8 +2264,6 @@
 void ceph_messenger_destroy(struct ceph_messenger *msgr)
 {
 	dout("destroy %p\n", msgr);
-	kunmap(msgr->zero_page);
-	__free_page(msgr->zero_page);
 	kfree(msgr);
 	dout("destroyed messenger %p\n", msgr);
 }