ceph: rewrite msgpool using mempool_t

Since we don't need to maintain large pools of messages, we can just
use the standard mempool_t.  We maintain a msgpool 'wrapper' because we
need the mempool_t* in the alloc function, and mempool gives us only
pool_data.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/msgpool.c b/fs/ceph/msgpool.c
index 030297f..ca03222 100644
--- a/fs/ceph/msgpool.c
+++ b/fs/ceph/msgpool.c
@@ -7,106 +7,43 @@
 
 #include "msgpool.h"
 
-/*
- * We use msg pools to preallocate memory for messages we expect to
- * receive over the wire, to avoid getting ourselves into OOM
- * conditions at unexpected times.  We take use a few different
- * strategies:
- *
- *  - for request/response type interactions, we preallocate the
- * memory needed for the response when we generate the request.
- *
- *  - for messages we can receive at any time from the MDS, we preallocate
- * a pool of messages we can re-use.
- *
- *  - for writeback, we preallocate some number of messages to use for
- * requests and their replies, so that we always make forward
- * progress.
- *
- * The msgpool behaves like a mempool_t, but keeps preallocated
- * ceph_msgs strung together on a list_head instead of using a pointer
- * vector.  This avoids vector reallocation when we adjust the number
- * of preallocated items (which happens frequently).
- */
-
-
-/*
- * Allocate or release as necessary to meet our target pool size.
- */
-static int __fill_msgpool(struct ceph_msgpool *pool)
+static void *alloc_fn(gfp_t gfp_mask, void *arg)
 {
-	struct ceph_msg *msg;
+	struct ceph_msgpool *pool = arg;
+	struct ceph_msg *m;
 
-	while (pool->num < pool->min) {
-		dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num,
-		     pool->min);
-		spin_unlock(&pool->lock);
-		msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
-		spin_lock(&pool->lock);
-		if (IS_ERR(msg))
-			return PTR_ERR(msg);
-		msg->pool = pool;
-		list_add(&msg->list_head, &pool->msgs);
-		pool->num++;
-	}
-	while (pool->num > pool->min) {
-		msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head);
-		dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num,
-		     pool->min, msg);
-		list_del_init(&msg->list_head);
-		pool->num--;
-		ceph_msg_kfree(msg);
-	}
-	return 0;
+	m = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
+	if (IS_ERR(m))
+		return NULL;
+	return m;
+}
+
+static void free_fn(void *element, void *arg)
+{
+	ceph_msg_put(element);
 }
 
 int ceph_msgpool_init(struct ceph_msgpool *pool,
-		      int front_len, int min, bool blocking)
+		      int front_len, int size, bool blocking)
 {
-	int ret;
-
-	dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min);
-	spin_lock_init(&pool->lock);
 	pool->front_len = front_len;
-	INIT_LIST_HEAD(&pool->msgs);
-	pool->num = 0;
-	pool->min = min;
-	pool->blocking = blocking;
-	init_waitqueue_head(&pool->wait);
-
-	spin_lock(&pool->lock);
-	ret = __fill_msgpool(pool);
-	spin_unlock(&pool->lock);
-	return ret;
+	pool->pool = mempool_create(size, alloc_fn, free_fn, pool);
+	if (!pool->pool)
+		return -ENOMEM;
+	return 0;
 }
 
 void ceph_msgpool_destroy(struct ceph_msgpool *pool)
 {
-	dout("msgpool_destroy %p\n", pool);
-	spin_lock(&pool->lock);
-	pool->min = 0;
-	__fill_msgpool(pool);
-	spin_unlock(&pool->lock);
+	mempool_destroy(pool->pool);
 }
 
-int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta)
+struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
+				  int front_len)
 {
-	int ret;
+	if (front_len > pool->front_len) {
+		struct ceph_msg *msg;
 
-	spin_lock(&pool->lock);
-	dout("msgpool_resv %p delta %d\n", pool, delta);
-	pool->min += delta;
-	ret = __fill_msgpool(pool);
-	spin_unlock(&pool->lock);
-	return ret;
-}
-
-struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len)
-{
-	wait_queue_t wait;
-	struct ceph_msg *msg;
-
-	if (front_len && front_len > pool->front_len) {
 		pr_err("msgpool_get pool %p need front %d, pool size is %d\n",
 		       pool, front_len, pool->front_len);
 		WARN_ON(1);
@@ -115,72 +52,17 @@
 		msg = ceph_msg_new(0, front_len, 0, 0, NULL);
 		if (!IS_ERR(msg))
 			return msg;
+		return NULL;
 	}
 
-	if (!front_len)
-		front_len = pool->front_len;
-
-	if (pool->blocking) {
-		/* mempool_t behavior; first try to alloc */
-		msg = ceph_msg_new(0, front_len, 0, 0, NULL);
-		if (!IS_ERR(msg))
-			return msg;
-	}
-
-	while (1) {
-		spin_lock(&pool->lock);
-		if (likely(pool->num)) {
-			msg = list_entry(pool->msgs.next, struct ceph_msg,
-					 list_head);
-			list_del_init(&msg->list_head);
-			pool->num--;
-			dout("msgpool_get %p got %p, now %d/%d\n", pool, msg,
-			     pool->num, pool->min);
-			spin_unlock(&pool->lock);
-			return msg;
-		}
-		pr_err("msgpool_get %p now %d/%d, %s\n", pool, pool->num,
-		       pool->min, pool->blocking ? "waiting" : "may fail");
-		spin_unlock(&pool->lock);
-
-		if (!pool->blocking) {
-			WARN_ON(1);
-
-			/* maybe we can allocate it now? */
-			msg = ceph_msg_new(0, front_len, 0, 0, NULL);
-			if (!IS_ERR(msg))
-				return msg;
-
-			pr_err("msgpool_get %p empty + alloc failed\n", pool);
-			return ERR_PTR(-ENOMEM);
-		}
-
-		init_wait(&wait);
-		prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE);
-		schedule();
-		finish_wait(&pool->wait, &wait);
-	}
+	return mempool_alloc(pool->pool, GFP_NOFS);
 }
 
 void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
 {
-	spin_lock(&pool->lock);
-	if (pool->num < pool->min) {
-		/* reset msg front_len; user may have changed it */
-		msg->front.iov_len = pool->front_len;
-		msg->hdr.front_len = cpu_to_le32(pool->front_len);
+	/* reset msg front_len; user may have changed it */
+	msg->front.iov_len = pool->front_len;
+	msg->hdr.front_len = cpu_to_le32(pool->front_len);
 
-		kref_init(&msg->kref);  /* retake a single ref */
-		list_add(&msg->list_head, &pool->msgs);
-		pool->num++;
-		dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,
-		     pool->num, pool->min);
-		spin_unlock(&pool->lock);
-		wake_up(&pool->wait);
-	} else {
-		dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg,
-		     pool->num, pool->min);
-		spin_unlock(&pool->lock);
-		ceph_msg_kfree(msg);
-	}
+	kref_init(&msg->kref);  /* retake single ref */
 }
diff --git a/fs/ceph/msgpool.h b/fs/ceph/msgpool.h
index bc834bf..62a61c7 100644
--- a/fs/ceph/msgpool.h
+++ b/fs/ceph/msgpool.h
@@ -1,6 +1,7 @@
 #ifndef _FS_CEPH_MSGPOOL
 #define _FS_CEPH_MSGPOOL
 
+#include <linux/mempool.h>
 #include "messenger.h"
 
 /*
@@ -8,18 +9,13 @@
  * avoid unexpected OOM conditions.
  */
 struct ceph_msgpool {
-	spinlock_t lock;
+	mempool_t *pool;
 	int front_len;          /* preallocated payload size */
-	struct list_head msgs;  /* msgs in the pool; each has 1 ref */
-	int num, min;           /* cur, min # msgs in the pool */
-	bool blocking;
-	wait_queue_head_t wait;
 };
 
 extern int ceph_msgpool_init(struct ceph_msgpool *pool,
 			     int front_len, int size, bool blocking);
 extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
-extern int ceph_msgpool_resv(struct ceph_msgpool *, int delta);
 extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *,
 					 int front_len);
 extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);