ocfs2_dlm: fix cluster-wide refcounting of lock resources

This was previously broken and migration of some locks had to be temporarily
disabled. We use a new (and backward-incompatible) set of network messages
to account for all references to a lock resources held across the cluster.
once these are all freed, the master node may then free the lock resource
memory once its local references are dropped.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 6b6ff76..9fa4271 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -222,6 +222,7 @@
 #define DLM_LOCK_RES_DIRTY                0x00000008
 #define DLM_LOCK_RES_IN_PROGRESS          0x00000010
 #define DLM_LOCK_RES_MIGRATING            0x00000020
+#define DLM_LOCK_RES_DROPPING_REF         0x00000040
 
 /* max milliseconds to wait to sync up a network failure with a node death */
 #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
@@ -265,6 +266,8 @@
 	u8  owner;              //node which owns the lock resource, or unknown
 	u16 state;
 	char lvb[DLM_LVB_LEN];
+	unsigned int inflight_locks;
+	unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 };
 
 struct dlm_migratable_lock
@@ -367,7 +370,7 @@
 	DLM_CONVERT_LOCK_MSG,	 /* 504 */
 	DLM_PROXY_AST_MSG,	 /* 505 */
 	DLM_UNLOCK_LOCK_MSG,	 /* 506 */
-	DLM_UNUSED_MSG2,	 /* 507 */
+	DLM_DEREF_LOCKRES_MSG,	 /* 507 */
 	DLM_MIGRATE_REQUEST_MSG, /* 508 */
 	DLM_MIG_LOCKRES_MSG, 	 /* 509 */
 	DLM_QUERY_JOIN_MSG,	 /* 510 */
@@ -417,6 +420,9 @@
 	u8 name[O2NM_MAX_NAME_LEN];
 };
 
+#define DLM_ASSERT_RESPONSE_REASSERT       0x00000001
+#define DLM_ASSERT_RESPONSE_MASTERY_REF    0x00000002
+
 #define DLM_ASSERT_MASTER_MLE_CLEANUP      0x00000001
 #define DLM_ASSERT_MASTER_REQUERY          0x00000002
 #define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
@@ -430,6 +436,8 @@
 	u8 name[O2NM_MAX_NAME_LEN];
 };
 
+#define DLM_MIGRATE_RESPONSE_MASTERY_REF   0x00000001
+
 struct dlm_migrate_request
 {
 	u8 master;
@@ -648,6 +656,16 @@
 	__be32 pad2;
 };
 
+struct dlm_deref_lockres
+{
+	u32 pad1;
+	u16 pad2;
+	u8 node_idx;
+	u8 namelen;
+
+	u8 name[O2NM_MAX_NAME_LEN];
+};
+
 static inline enum dlm_status
 __dlm_lockres_state_to_status(struct dlm_lock_resource *res)
 {
@@ -721,8 +739,8 @@
 			      struct dlm_lock_resource *res);
 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 			    struct dlm_lock_resource *res);
-void dlm_purge_lockres(struct dlm_ctxt *dlm,
-		       struct dlm_lock_resource *lockres);
+int dlm_purge_lockres(struct dlm_ctxt *dlm,
+		      struct dlm_lock_resource *lockres);
 static inline void dlm_lockres_get(struct dlm_lock_resource *res)
 {
 	/* This is called on every lookup, so it might be worth
@@ -733,6 +751,10 @@
 void __dlm_unhash_lockres(struct dlm_lock_resource *res);
 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
 			  struct dlm_lock_resource *res);
+struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
+						     const char *name,
+						     unsigned int len,
+						     unsigned int hash);
 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 						const char *name,
 						unsigned int len,
@@ -753,6 +775,47 @@
 					  const char *name,
 					  unsigned int namelen);
 
+#define dlm_lockres_set_refmap_bit(bit,res)  \
+	__dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
+#define dlm_lockres_clear_refmap_bit(bit,res)  \
+	__dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
+
+static inline void __dlm_lockres_set_refmap_bit(int bit,
+						struct dlm_lock_resource *res,
+						const char *file,
+						int line)
+{
+	//printk("%s:%d:%.*s: setting bit %d\n", file, line,
+	//     res->lockname.len, res->lockname.name, bit);
+	set_bit(bit, res->refmap);
+}
+
+static inline void __dlm_lockres_clear_refmap_bit(int bit,
+						  struct dlm_lock_resource *res,
+						  const char *file,
+						  int line)
+{
+	//printk("%s:%d:%.*s: clearing bit %d\n", file, line,
+	//     res->lockname.len, res->lockname.name, bit);
+	clear_bit(bit, res->refmap);
+}
+
+void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+				   struct dlm_lock_resource *res,
+				   const char *file,
+				   int line);
+void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+				   struct dlm_lock_resource *res,
+				   int new_lockres,
+				   const char *file,
+				   int line);
+#define dlm_lockres_drop_inflight_ref(d,r)  \
+	__dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
+#define dlm_lockres_grab_inflight_ref(d,r)  \
+	__dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
+#define dlm_lockres_grab_inflight_ref_new(d,r)  \
+	__dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
+
 void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 void dlm_do_local_ast(struct dlm_ctxt *dlm,
@@ -805,6 +868,7 @@
 int dlm_migrate_lockres(struct dlm_ctxt *dlm,
 			struct dlm_lock_resource *res,
 			u8 target);
+int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
 int dlm_finish_migration(struct dlm_ctxt *dlm,
 			 struct dlm_lock_resource *res,
 			 u8 old_master);
@@ -814,6 +878,7 @@
 
 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data);
@@ -856,10 +921,12 @@
 int dlm_init_mle_cache(void);
 void dlm_destroy_mle_cache(void);
 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
+int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
+			 struct dlm_lock_resource *res);
 void dlm_clean_master_list(struct dlm_ctxt *dlm,
 			   u8 dead_node);
 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
-
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
 int __dlm_lockres_unused(struct dlm_lock_resource *res);
 
 static inline const char * dlm_lock_mode_name(int mode)