staging/lustre/llite: allocate and free client cache asynchronously

Since the inflight request holds import refcount as well as export,
sometimes obd_disconnect() in client_common_put_super() can't put
the last refcount of OSC import (e.g. due to network disconnection),
this will cause cl_cache being accessed after free.

To fix this issue, ccc_users is used as cl_cache refcount, and
lov/llite/osc all hold one cl_cache refcount respectively, to avoid
the race that a new OST is being added into the system when the client
is mounted.
The following cl_cache functions are added:
- cl_cache_init(): allocate and initialize cl_cache
- cl_cache_incref(): increase cl_cache refcount
- cl_cache_decref(): decrease cl_cache refcount and free the cache
  if refcount=0.

Signed-off-by: Emoly Liu <emoly.liu@intel.com>
Reviewed-on: http://review.whamcloud.com/13746
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-6173
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Signed-off-by: Oleg Drokin <green@linuxhacker.ru>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
diff --git a/drivers/staging/lustre/lustre/include/cl_object.h b/drivers/staging/lustre/lustre/include/cl_object.h
index 36ca935..3cd4a25 100644
--- a/drivers/staging/lustre/lustre/include/cl_object.h
+++ b/drivers/staging/lustre/lustre/include/cl_object.h
@@ -2322,7 +2322,8 @@
  */
 struct cl_client_cache {
 	/**
-	 * # of users (OSCs)
+	 * # of client cache refcount
+	 * # of users (OSCs) + 2 (held by llite and lov)
 	 */
 	atomic_t		ccc_users;
 	/**
@@ -2357,6 +2358,13 @@
 
 };
 
+/**
+ * cl_cache functions
+ */
+struct cl_client_cache *cl_cache_init(unsigned long lru_page_max);
+void cl_cache_incref(struct cl_client_cache *cache);
+void cl_cache_decref(struct cl_client_cache *cache);
+
 /** @} cl_page */
 
 /** \defgroup cl_lock cl_lock
diff --git a/drivers/staging/lustre/lustre/include/obd.h b/drivers/staging/lustre/lustre/include/obd.h
index 9971ee5..40909b0 100644
--- a/drivers/staging/lustre/lustre/include/obd.h
+++ b/drivers/staging/lustre/lustre/include/obd.h
@@ -415,7 +415,7 @@
 	enum lustre_sec_part    lov_sp_me;
 
 	/* Cached LRU and unstable data from upper layer */
-	void		       *lov_cache;
+	struct cl_client_cache *lov_cache;
 
 	struct rw_semaphore     lov_notify_lock;
 
diff --git a/drivers/staging/lustre/lustre/llite/llite_internal.h b/drivers/staging/lustre/lustre/llite/llite_internal.h
index 8fe63bd..657ebd0 100644
--- a/drivers/staging/lustre/lustre/llite/llite_internal.h
+++ b/drivers/staging/lustre/lustre/llite/llite_internal.h
@@ -452,7 +452,7 @@
 	 * any page which is sent to a server as part of a bulk request,
 	 * but is uncommitted to stable storage.
 	 */
-	struct cl_client_cache    ll_cache;
+	struct cl_client_cache    *ll_cache;
 
 	struct lprocfs_stats     *ll_ra_stats;
 
diff --git a/drivers/staging/lustre/lustre/llite/llite_lib.c b/drivers/staging/lustre/lustre/llite/llite_lib.c
index ae6a571..4ab1f2b 100644
--- a/drivers/staging/lustre/lustre/llite/llite_lib.c
+++ b/drivers/staging/lustre/lustre/llite/llite_lib.c
@@ -83,15 +83,11 @@
 	pages = si.totalram - si.totalhigh;
 	lru_page_max = pages / 2;
 
-	/* initialize ll_cache data */
-	atomic_set(&sbi->ll_cache.ccc_users, 0);
-	sbi->ll_cache.ccc_lru_max = lru_page_max;
-	atomic_set(&sbi->ll_cache.ccc_lru_left, lru_page_max);
-	spin_lock_init(&sbi->ll_cache.ccc_lru_lock);
-	INIT_LIST_HEAD(&sbi->ll_cache.ccc_lru);
-
-	atomic_set(&sbi->ll_cache.ccc_unstable_nr, 0);
-	init_waitqueue_head(&sbi->ll_cache.ccc_unstable_waitq);
+	sbi->ll_cache = cl_cache_init(lru_page_max);
+	if (!sbi->ll_cache) {
+		kfree(sbi);
+		return NULL;
+	}
 
 	sbi->ll_ra_info.ra_max_pages_per_file = min(pages / 32,
 					   SBI_DEFAULT_READAHEAD_MAX);
@@ -131,6 +127,11 @@
 {
 	struct ll_sb_info *sbi = ll_s2sbi(sb);
 
+	if (sbi->ll_cache) {
+		cl_cache_decref(sbi->ll_cache);
+		sbi->ll_cache = NULL;
+	}
+
 	kfree(sbi);
 }
 
@@ -488,8 +489,8 @@
 	cl_sb_init(sb);
 
 	err = obd_set_info_async(NULL, sbi->ll_dt_exp, sizeof(KEY_CACHE_SET),
-				 KEY_CACHE_SET, sizeof(sbi->ll_cache),
-				 &sbi->ll_cache, NULL);
+				 KEY_CACHE_SET, sizeof(*sbi->ll_cache),
+				 sbi->ll_cache, NULL);
 
 	sb->s_root = d_make_root(root);
 	if (!sb->s_root) {
@@ -534,8 +535,6 @@
 out_dt:
 	obd_disconnect(sbi->ll_dt_exp);
 	sbi->ll_dt_exp = NULL;
-	/* Make sure all OScs are gone, since cl_cache is accessing sbi. */
-	obd_zombie_barrier();
 out_md_fid:
 	obd_fid_fini(sbi->ll_md_exp->exp_obd);
 out_md:
@@ -585,10 +584,6 @@
 	obd_fid_fini(sbi->ll_dt_exp->exp_obd);
 	obd_disconnect(sbi->ll_dt_exp);
 	sbi->ll_dt_exp = NULL;
-	/* wait till all OSCs are gone, since cl_cache is accessing sbi.
-	 * see LU-2543.
-	 */
-	obd_zombie_barrier();
 
 	ldebugfs_unregister_mountpoint(sbi);
 
@@ -921,12 +916,12 @@
 	if (!force) {
 		struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
 
-		rc = l_wait_event(sbi->ll_cache.ccc_unstable_waitq,
-				  !atomic_read(&sbi->ll_cache.ccc_unstable_nr),
+		rc = l_wait_event(sbi->ll_cache->ccc_unstable_waitq,
+				  !atomic_read(&sbi->ll_cache->ccc_unstable_nr),
 				  &lwi);
 	}
 
-	ccc_count = atomic_read(&sbi->ll_cache.ccc_unstable_nr);
+	ccc_count = atomic_read(&sbi->ll_cache->ccc_unstable_nr);
 	if (!force && rc != -EINTR)
 		LASSERTF(!ccc_count, "count: %i\n", ccc_count);
 
diff --git a/drivers/staging/lustre/lustre/llite/lproc_llite.c b/drivers/staging/lustre/lustre/llite/lproc_llite.c
index 18a7775..e86bf3c 100644
--- a/drivers/staging/lustre/lustre/llite/lproc_llite.c
+++ b/drivers/staging/lustre/lustre/llite/lproc_llite.c
@@ -356,7 +356,7 @@
 {
 	struct super_block     *sb    = m->private;
 	struct ll_sb_info      *sbi   = ll_s2sbi(sb);
-	struct cl_client_cache *cache = &sbi->ll_cache;
+	struct cl_client_cache *cache = sbi->ll_cache;
 	int shift = 20 - PAGE_SHIFT;
 	int max_cached_mb;
 	int unused_mb;
@@ -383,7 +383,7 @@
 {
 	struct super_block *sb = ((struct seq_file *)file->private_data)->private;
 	struct ll_sb_info *sbi = ll_s2sbi(sb);
-	struct cl_client_cache *cache = &sbi->ll_cache;
+	struct cl_client_cache *cache = sbi->ll_cache;
 	struct lu_env *env;
 	int refcheck;
 	int mult, rc, pages_number;
@@ -822,7 +822,7 @@
 {
 	struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
 					      ll_kobj);
-	struct cl_client_cache *cache = &sbi->ll_cache;
+	struct cl_client_cache *cache = sbi->ll_cache;
 	int pages, mb;
 
 	pages = atomic_read(&cache->ccc_unstable_nr);
diff --git a/drivers/staging/lustre/lustre/lov/lov_obd.c b/drivers/staging/lustre/lustre/lov/lov_obd.c
index c87096e..9b92d55 100644
--- a/drivers/staging/lustre/lustre/lov/lov_obd.c
+++ b/drivers/staging/lustre/lustre/lov/lov_obd.c
@@ -892,6 +892,12 @@
 		kfree(lov->lov_tgts);
 		lov->lov_tgt_size = 0;
 	}
+
+	if (lov->lov_cache) {
+		cl_cache_decref(lov->lov_cache);
+		lov->lov_cache = NULL;
+	}
+
 	return 0;
 }
 
@@ -2121,6 +2127,7 @@
 		LASSERT(!lov->lov_cache);
 		lov->lov_cache = val;
 		do_inactive = 1;
+		cl_cache_incref(lov->lov_cache);
 	}
 
 	for (i = 0; i < count; i++, val = (char *)val + incr) {
diff --git a/drivers/staging/lustre/lustre/obdclass/cl_page.c b/drivers/staging/lustre/lustre/obdclass/cl_page.c
index 71bff49..db2dc6b 100644
--- a/drivers/staging/lustre/lustre/obdclass/cl_page.c
+++ b/drivers/staging/lustre/lustre/obdclass/cl_page.c
@@ -1072,3 +1072,49 @@
 	slice->cpl_page = page;
 }
 EXPORT_SYMBOL(cl_page_slice_add);
+
+/**
+ * Allocate and initialize cl_cache, called by ll_init_sbi().
+ */
+struct cl_client_cache *cl_cache_init(unsigned long lru_page_max)
+{
+	struct cl_client_cache	*cache = NULL;
+
+	cache = kzalloc(sizeof(*cache), GFP_KERNEL);
+	if (!cache)
+		return NULL;
+
+	/* Initialize cache data */
+	atomic_set(&cache->ccc_users, 1);
+	cache->ccc_lru_max = lru_page_max;
+	atomic_set(&cache->ccc_lru_left, lru_page_max);
+	spin_lock_init(&cache->ccc_lru_lock);
+	INIT_LIST_HEAD(&cache->ccc_lru);
+
+	atomic_set(&cache->ccc_unstable_nr, 0);
+	init_waitqueue_head(&cache->ccc_unstable_waitq);
+
+	return cache;
+}
+EXPORT_SYMBOL(cl_cache_init);
+
+/**
+ * Increase cl_cache refcount
+ */
+void cl_cache_incref(struct cl_client_cache *cache)
+{
+	atomic_inc(&cache->ccc_users);
+}
+EXPORT_SYMBOL(cl_cache_incref);
+
+/**
+ * Decrease cl_cache refcount and free the cache if refcount=0.
+ * Since llite, lov and osc all hold cl_cache refcount,
+ * the free will not cause race. (LU-6173)
+ */
+void cl_cache_decref(struct cl_client_cache *cache)
+{
+	if (atomic_dec_and_test(&cache->ccc_users))
+		kfree(cache);
+}
+EXPORT_SYMBOL(cl_cache_decref);
diff --git a/drivers/staging/lustre/lustre/osc/osc_page.c b/drivers/staging/lustre/lustre/osc/osc_page.c
index 18c261b..355f496 100644
--- a/drivers/staging/lustre/lustre/osc/osc_page.c
+++ b/drivers/staging/lustre/lustre/osc/osc_page.c
@@ -412,7 +412,7 @@
 	int pages = atomic_read(&cli->cl_lru_in_list);
 	unsigned long budget;
 
-	budget = cache->ccc_lru_max / atomic_read(&cache->ccc_users);
+	budget = cache->ccc_lru_max / (atomic_read(&cache->ccc_users) - 2);
 
 	/* if it's going to run out LRU slots, we should free some, but not
 	 * too much to maintain fairness among OSCs.
@@ -712,7 +712,7 @@
 	cache->ccc_lru_shrinkers++;
 	list_move_tail(&cli->cl_lru_osc, &cache->ccc_lru);
 
-	max_scans = atomic_read(&cache->ccc_users);
+	max_scans = atomic_read(&cache->ccc_users) - 2;
 	while (--max_scans > 0 && !list_empty(&cache->ccc_lru)) {
 		cli = list_entry(cache->ccc_lru.next, struct client_obd,
 				 cl_lru_osc);
diff --git a/drivers/staging/lustre/lustre/osc/osc_request.c b/drivers/staging/lustre/lustre/osc/osc_request.c
index 7260027..9334349 100644
--- a/drivers/staging/lustre/lustre/osc/osc_request.c
+++ b/drivers/staging/lustre/lustre/osc/osc_request.c
@@ -2913,7 +2913,7 @@
 
 		LASSERT(!cli->cl_cache); /* only once */
 		cli->cl_cache = val;
-		atomic_inc(&cli->cl_cache->ccc_users);
+		cl_cache_incref(cli->cl_cache);
 		cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
 
 		/* add this osc into entity list */
@@ -3293,7 +3293,7 @@
 		list_del_init(&cli->cl_lru_osc);
 		spin_unlock(&cli->cl_cache->ccc_lru_lock);
 		cli->cl_lru_left = NULL;
-		atomic_dec(&cli->cl_cache->ccc_users);
+		cl_cache_decref(cli->cl_cache);
 		cli->cl_cache = NULL;
 	}