ceph: do caps accounting per mds_client

Caps related accounting is now being done per mds client instead
of just being global. This prepares ground work for a later revision
of the caps preallocated reservation list.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index d992880..47068b1 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -113,58 +113,41 @@
 	return cap_str[i];
 }
 
-/*
- * Cap reservations
- *
- * Maintain a global pool of preallocated struct ceph_caps, referenced
- * by struct ceph_caps_reservations.  This ensures that we preallocate
- * memory needed to successfully process an MDS response.  (If an MDS
- * sends us cap information and we fail to process it, we will have
- * problems due to the client and MDS being out of sync.)
- *
- * Reservations are 'owned' by a ceph_cap_reservation context.
- */
-static spinlock_t caps_list_lock;
-static struct list_head caps_list;  /* unused (reserved or unreserved) */
-static int caps_total_count;        /* total caps allocated */
-static int caps_use_count;          /* in use */
-static int caps_reserve_count;      /* unused, reserved */
-static int caps_avail_count;        /* unused, unreserved */
-static int caps_min_count;          /* keep at least this many (unreserved) */
-
-void __init ceph_caps_init(void)
+void ceph_caps_init(struct ceph_mds_client *mdsc)
 {
-	INIT_LIST_HEAD(&caps_list);
-	spin_lock_init(&caps_list_lock);
+	INIT_LIST_HEAD(&mdsc->caps_list);
+	spin_lock_init(&mdsc->caps_list_lock);
 }
 
-void ceph_caps_finalize(void)
+void ceph_caps_finalize(struct ceph_mds_client *mdsc)
 {
 	struct ceph_cap *cap;
 
-	spin_lock(&caps_list_lock);
-	while (!list_empty(&caps_list)) {
-		cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+	spin_lock(&mdsc->caps_list_lock);
+	while (!list_empty(&mdsc->caps_list)) {
+		cap = list_first_entry(&mdsc->caps_list,
+				       struct ceph_cap, caps_item);
 		list_del(&cap->caps_item);
 		kmem_cache_free(ceph_cap_cachep, cap);
 	}
-	caps_total_count = 0;
-	caps_avail_count = 0;
-	caps_use_count = 0;
-	caps_reserve_count = 0;
-	caps_min_count = 0;
-	spin_unlock(&caps_list_lock);
+	mdsc->caps_total_count = 0;
+	mdsc->caps_avail_count = 0;
+	mdsc->caps_use_count = 0;
+	mdsc->caps_reserve_count = 0;
+	mdsc->caps_min_count = 0;
+	spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(int delta)
+void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
 {
-	spin_lock(&caps_list_lock);
-	caps_min_count += delta;
-	BUG_ON(caps_min_count < 0);
-	spin_unlock(&caps_list_lock);
+	spin_lock(&mdsc->caps_list_lock);
+	mdsc->caps_min_count += delta;
+	BUG_ON(mdsc->caps_min_count < 0);
+	spin_unlock(&mdsc->caps_list_lock);
 }
 
-int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+		      struct ceph_cap_reservation *ctx, int need)
 {
 	int i;
 	struct ceph_cap *cap;
@@ -176,16 +159,17 @@
 	dout("reserve caps ctx=%p need=%d\n", ctx, need);
 
 	/* first reserve any caps that are already allocated */
-	spin_lock(&caps_list_lock);
-	if (caps_avail_count >= need)
+	spin_lock(&mdsc->caps_list_lock);
+	if (mdsc->caps_avail_count >= need)
 		have = need;
 	else
-		have = caps_avail_count;
-	caps_avail_count -= have;
-	caps_reserve_count += have;
-	BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-	       caps_avail_count);
-	spin_unlock(&caps_list_lock);
+		have = mdsc->caps_avail_count;
+	mdsc->caps_avail_count -= have;
+	mdsc->caps_reserve_count += have;
+	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+					 mdsc->caps_reserve_count +
+					 mdsc->caps_avail_count);
+	spin_unlock(&mdsc->caps_list_lock);
 
 	for (i = have; i < need; i++) {
 		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +182,20 @@
 	}
 	BUG_ON(have + alloc != need);
 
-	spin_lock(&caps_list_lock);
-	caps_total_count += alloc;
-	caps_reserve_count += alloc;
-	list_splice(&newcaps, &caps_list);
+	spin_lock(&mdsc->caps_list_lock);
+	mdsc->caps_total_count += alloc;
+	mdsc->caps_reserve_count += alloc;
+	list_splice(&newcaps, &mdsc->caps_list);
 
-	BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-	       caps_avail_count);
-	spin_unlock(&caps_list_lock);
+	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+					 mdsc->caps_reserve_count +
+					 mdsc->caps_avail_count);
+	spin_unlock(&mdsc->caps_list_lock);
 
 	ctx->count = need;
 	dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
-	     ctx, caps_total_count, caps_use_count, caps_reserve_count,
-	     caps_avail_count);
+	     ctx, mdsc->caps_total_count, mdsc->caps_use_count,
+	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
 	return 0;
 
 out_alloc_count:
@@ -220,26 +205,29 @@
 	return ret;
 }
 
-int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
+int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+			struct ceph_cap_reservation *ctx)
 {
 	dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
 	if (ctx->count) {
-		spin_lock(&caps_list_lock);
-		BUG_ON(caps_reserve_count < ctx->count);
-		caps_reserve_count -= ctx->count;
-		caps_avail_count += ctx->count;
+		spin_lock(&mdsc->caps_list_lock);
+		BUG_ON(mdsc->caps_reserve_count < ctx->count);
+		mdsc->caps_reserve_count -= ctx->count;
+		mdsc->caps_avail_count += ctx->count;
 		ctx->count = 0;
 		dout("unreserve caps %d = %d used + %d resv + %d avail\n",
-		     caps_total_count, caps_use_count, caps_reserve_count,
-		     caps_avail_count);
-		BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-		       caps_avail_count);
-		spin_unlock(&caps_list_lock);
+		     mdsc->caps_total_count, mdsc->caps_use_count,
+		     mdsc->caps_reserve_count, mdsc->caps_avail_count);
+		BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+						 mdsc->caps_reserve_count +
+						 mdsc->caps_avail_count);
+		spin_unlock(&mdsc->caps_list_lock);
 	}
 	return 0;
 }
 
-static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
+static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
+				struct ceph_cap_reservation *ctx)
 {
 	struct ceph_cap *cap = NULL;
 
@@ -247,71 +235,74 @@
 	if (!ctx) {
 		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
 		if (cap) {
-			caps_use_count++;
-			caps_total_count++;
+			mdsc->caps_use_count++;
+			mdsc->caps_total_count++;
 		}
 		return cap;
 	}
 
-	spin_lock(&caps_list_lock);
+	spin_lock(&mdsc->caps_list_lock);
 	dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-	     ctx, ctx->count, caps_total_count, caps_use_count,
-	     caps_reserve_count, caps_avail_count);
+	     ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
+	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
 	BUG_ON(!ctx->count);
-	BUG_ON(ctx->count > caps_reserve_count);
-	BUG_ON(list_empty(&caps_list));
+	BUG_ON(ctx->count > mdsc->caps_reserve_count);
+	BUG_ON(list_empty(&mdsc->caps_list));
 
 	ctx->count--;
-	caps_reserve_count--;
-	caps_use_count++;
+	mdsc->caps_reserve_count--;
+	mdsc->caps_use_count++;
 
-	cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+	cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
 	list_del(&cap->caps_item);
 
-	BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-	       caps_avail_count);
-	spin_unlock(&caps_list_lock);
+	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+	       mdsc->caps_reserve_count + mdsc->caps_avail_count);
+	spin_unlock(&mdsc->caps_list_lock);
 	return cap;
 }
 
-void ceph_put_cap(struct ceph_cap *cap)
+void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
 {
-	spin_lock(&caps_list_lock);
+	spin_lock(&mdsc->caps_list_lock);
 	dout("put_cap %p %d = %d used + %d resv + %d avail\n",
-	     cap, caps_total_count, caps_use_count,
-	     caps_reserve_count, caps_avail_count);
-	caps_use_count--;
+	     cap, mdsc->caps_total_count, mdsc->caps_use_count,
+	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
+	mdsc->caps_use_count--;
 	/*
 	 * Keep some preallocated caps around (ceph_min_count), to
 	 * avoid lots of free/alloc churn.
 	 */
-	if (caps_avail_count >= caps_reserve_count + caps_min_count) {
-		caps_total_count--;
+	if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
+				      mdsc->caps_min_count) {
+		mdsc->caps_total_count--;
 		kmem_cache_free(ceph_cap_cachep, cap);
 	} else {
-		caps_avail_count++;
-		list_add(&cap->caps_item, &caps_list);
+		mdsc->caps_avail_count++;
+		list_add(&cap->caps_item, &mdsc->caps_list);
 	}
 
-	BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-	       caps_avail_count);
-	spin_unlock(&caps_list_lock);
+	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+	       mdsc->caps_reserve_count + mdsc->caps_avail_count);
+	spin_unlock(&mdsc->caps_list_lock);
 }
 
 void ceph_reservation_status(struct ceph_client *client,
 			     int *total, int *avail, int *used, int *reserved,
 			     int *min)
 {
+	struct ceph_mds_client *mdsc = &client->mdsc;
+
 	if (total)
-		*total = caps_total_count;
+		*total = mdsc->caps_total_count;
 	if (avail)
-		*avail = caps_avail_count;
+		*avail = mdsc->caps_avail_count;
 	if (used)
-		*used = caps_use_count;
+		*used = mdsc->caps_use_count;
 	if (reserved)
-		*reserved = caps_reserve_count;
+		*reserved = mdsc->caps_reserve_count;
 	if (min)
-		*min = caps_min_count;
+		*min = mdsc->caps_min_count;
 }
 
 /*
@@ -540,7 +531,7 @@
 			new_cap = NULL;
 		} else {
 			spin_unlock(&inode->i_lock);
-			new_cap = get_cap(caps_reservation);
+			new_cap = get_cap(mdsc, caps_reservation);
 			if (new_cap == NULL)
 				return -ENOMEM;
 			goto retry;
@@ -898,7 +889,7 @@
 		ci->i_auth_cap = NULL;
 
 	if (removed)
-		ceph_put_cap(cap);
+		ceph_put_cap(mdsc, cap);
 
 	if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
 		struct ceph_snap_realm *realm = ci->i_snap_realm;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 6e40db2..641a8a3 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -449,7 +449,7 @@
 	kfree(req->r_path1);
 	kfree(req->r_path2);
 	put_request_session(req);
-	ceph_unreserve_caps(&req->r_caps_reservation);
+	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 	kfree(req);
 }
 
@@ -512,7 +512,8 @@
 {
 	req->r_tid = ++mdsc->last_tid;
 	if (req->r_num_caps)
-		ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
+		ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+				  req->r_num_caps);
 	dout("__register_request %p tid %lld\n", req, req->r_tid);
 	ceph_mdsc_get_request(req);
 	__insert_request(mdsc, req);
@@ -764,7 +765,7 @@
 			last_inode = NULL;
 		}
 		if (old_cap) {
-			ceph_put_cap(old_cap);
+			ceph_put_cap(session->s_mdsc, old_cap);
 			old_cap = NULL;
 		}
 
@@ -793,7 +794,7 @@
 	if (last_inode)
 		iput(last_inode);
 	if (old_cap)
-		ceph_put_cap(old_cap);
+		ceph_put_cap(session->s_mdsc, old_cap);
 
 	return ret;
 }
@@ -1251,6 +1252,7 @@
 		return ERR_PTR(-ENOMEM);
 
 	mutex_init(&req->r_fill_mutex);
+	req->r_mdsc = mdsc;
 	req->r_started = jiffies;
 	req->r_resend_mds = -1;
 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1986,7 +1988,7 @@
 	if (err == 0) {
 		if (result == 0 && rinfo->dir_nr)
 			ceph_readdir_prepopulate(req, req->r_session);
-		ceph_unreserve_caps(&req->r_caps_reservation);
+		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
 	mutex_unlock(&req->r_fill_mutex);
 
@@ -2767,6 +2769,9 @@
 	spin_lock_init(&mdsc->dentry_lru_lock);
 	INIT_LIST_HEAD(&mdsc->dentry_lru);
 
+	ceph_caps_init(mdsc);
+	ceph_adjust_min_caps(mdsc, client->min_caps);
+
 	return 0;
 }
 
@@ -2962,6 +2967,7 @@
 	if (mdsc->mdsmap)
 		ceph_mdsmap_destroy(mdsc->mdsmap);
 	kfree(mdsc->sessions);
+	ceph_caps_finalize(mdsc);
 }
 
 
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e389902..8f21263 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -151,6 +151,7 @@
 struct ceph_mds_request {
 	u64 r_tid;                   /* transaction id */
 	struct rb_node r_node;
+	struct ceph_mds_client *r_mdsc;
 
 	int r_op;                    /* mds op code */
 	int r_mds;
@@ -267,6 +268,27 @@
 	spinlock_t        cap_dirty_lock;   /* protects above items */
 	wait_queue_head_t cap_flushing_wq;
 
+	/*
+	 * Cap reservations
+	 *
+	 * Maintain a global pool of preallocated struct ceph_caps, referenced
+	 * by struct ceph_caps_reservations.  This ensures that we preallocate
+	 * memory needed to successfully process an MDS response.  (If an MDS
+	 * sends us cap information and we fail to process it, we will have
+	 * problems due to the client and MDS being out of sync.)
+	 *
+	 * Reservations are 'owned' by a ceph_cap_reservation context.
+	 */
+	spinlock_t	caps_list_lock;
+	struct		list_head caps_list; /* unused (reserved or
+						unreserved) */
+	int		caps_total_count;    /* total caps allocated */
+	int		caps_use_count;      /* in use */
+	int		caps_reserve_count;  /* unused, reserved */
+	int		caps_avail_count;    /* unused, unreserved */
+	int		caps_min_count;      /* keep at least this many
+						(unreserved) */
+
 #ifdef CONFIG_DEBUG_FS
 	struct dentry 	  *debugfs_file;
 #endif
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index fa87f51..1a0bb48 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -630,7 +630,6 @@
 
 	/* caps */
 	client->min_caps = args->max_readdir;
-	ceph_adjust_min_caps(client->min_caps);
 
 	/* subsystems */
 	err = ceph_monc_init(&client->monc, client);
@@ -680,8 +679,6 @@
 
 	ceph_monc_stop(&client->monc);
 
-	ceph_adjust_min_caps(-client->min_caps);
-
 	ceph_debugfs_client_cleanup(client);
 	destroy_workqueue(client->wb_wq);
 	destroy_workqueue(client->pg_inv_wq);
@@ -1043,8 +1040,6 @@
 	if (ret)
 		goto out_msgr;
 
-	ceph_caps_init();
-
 	ret = register_filesystem(&ceph_fs_type);
 	if (ret)
 		goto out_icache;
@@ -1069,7 +1064,6 @@
 {
 	dout("exit_ceph\n");
 	unregister_filesystem(&ceph_fs_type);
-	ceph_caps_finalize();
 	destroy_caches();
 	ceph_msgr_exit();
 	ceph_debugfs_cleanup();
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 10a4a40..44d10cb 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -560,11 +560,13 @@
 /* what the mds thinks we want */
 extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);
 
-extern void ceph_caps_init(void);
-extern void ceph_caps_finalize(void);
-extern void ceph_adjust_min_caps(int delta);
-extern int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need);
-extern int ceph_unreserve_caps(struct ceph_cap_reservation *ctx);
+extern void ceph_caps_init(struct ceph_mds_client *mdsc);
+extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
+extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
+extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+			     struct ceph_cap_reservation *ctx, int need);
+extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+			       struct ceph_cap_reservation *ctx);
 extern void ceph_reservation_status(struct ceph_client *client,
 				    int *total, int *avail, int *used,
 				    int *reserved, int *min);
@@ -806,7 +808,8 @@
 	__ceph_remove_cap(cap);
 	spin_unlock(&inode->i_lock);
 }
-extern void ceph_put_cap(struct ceph_cap *cap);
+extern void ceph_put_cap(struct ceph_mds_client *mdsc,
+			 struct ceph_cap *cap);
 
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);