[DLM] add lock timeouts and warnings [2/6]

New features: lock timeouts and time warnings.  If the DLM_LKF_TIMEOUT
flag is set, then the request/conversion will be canceled after waiting
the specified number of centiseconds (specified per lock).  This feature
is only available for locks requested through libdlm (can be enabled for
kernel dlm users if there's a use for it.)

If the new DLM_LSFL_TIMEWARN flag is set when creating the lockspace, then
a warning message will be sent to userspace (using genetlink) after a
request/conversion has been waiting for a given number of centiseconds
(configurable per node).  The time warnings will be used in the future
to do deadlock detection in userspace.

Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 09668ec..ab986df 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -82,10 +82,13 @@
 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
 static int send_remove(struct dlm_rsb *r);
 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
+static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
 				    struct dlm_message *ms);
 static int receive_extralen(struct dlm_message *ms);
 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
+static void del_timeout(struct dlm_lkb *lkb);
+void dlm_timeout_warn(struct dlm_lkb *lkb);
 
 /*
  * Lock compatibilty matrix - thanks Steve
@@ -286,8 +289,17 @@
 	if (is_master_copy(lkb))
 		return;
 
+	del_timeout(lkb);
+
 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
 
+	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
+	   timeout caused the cancel then return -ETIMEDOUT */
+	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
+		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
+		rv = -ETIMEDOUT;
+	}
+
 	lkb->lkb_lksb->sb_status = rv;
 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
 
@@ -581,6 +593,7 @@
 	kref_init(&lkb->lkb_ref);
 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
+	INIT_LIST_HEAD(&lkb->lkb_time_list);
 
 	get_random_bytes(&bucket, sizeof(bucket));
 	bucket &= (ls->ls_lkbtbl_size - 1);
@@ -993,6 +1006,125 @@
 	}
 }
 
+static void add_timeout(struct dlm_lkb *lkb)
+{
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+
+	if (is_master_copy(lkb))
+		return;
+
+	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
+		goto add_it;
+
+	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
+	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
+		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
+		goto add_it;
+	}
+	return;
+
+ add_it:
+	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
+	mutex_lock(&ls->ls_timeout_mutex);
+	hold_lkb(lkb);
+	lkb->lkb_timestamp = jiffies;
+	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
+	mutex_unlock(&ls->ls_timeout_mutex);
+}
+
+static void del_timeout(struct dlm_lkb *lkb)
+{
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+
+	mutex_lock(&ls->ls_timeout_mutex);
+	if (!list_empty(&lkb->lkb_time_list)) {
+		list_del_init(&lkb->lkb_time_list);
+		unhold_lkb(lkb);
+	}
+	mutex_unlock(&ls->ls_timeout_mutex);
+}
+
+/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
+   lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
+   and then lock rsb because of lock ordering in add_timeout.  We may need
+   to specify some special timeout-related bits in the lkb that are just to
+   be accessed under the timeout_mutex. */
+
+void dlm_scan_timeout(struct dlm_ls *ls)
+{
+	struct dlm_rsb *r;
+	struct dlm_lkb *lkb;
+	int do_cancel, do_warn;
+
+	for (;;) {
+		if (dlm_locking_stopped(ls))
+			break;
+
+		do_cancel = 0;
+		do_warn = 0;
+		mutex_lock(&ls->ls_timeout_mutex);
+		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
+
+			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
+			    time_after_eq(jiffies, lkb->lkb_timestamp +
+					  lkb->lkb_timeout_cs * HZ/100))
+				do_cancel = 1;
+
+			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
+			    time_after_eq(jiffies, lkb->lkb_timestamp +
+				   	   dlm_config.ci_timewarn_cs * HZ/100))
+				do_warn = 1;
+
+			if (!do_cancel && !do_warn)
+				continue;
+			hold_lkb(lkb);
+			break;
+		}
+		mutex_unlock(&ls->ls_timeout_mutex);
+
+		if (!do_cancel && !do_warn)
+			break;
+
+		r = lkb->lkb_resource;
+		hold_rsb(r);
+		lock_rsb(r);
+
+		if (do_warn) {
+			/* clear flag so we only warn once */
+			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
+			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
+				del_timeout(lkb);
+			dlm_timeout_warn(lkb);
+		}
+
+		if (do_cancel) {
+			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
+			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
+			del_timeout(lkb);
+			_cancel_lock(r, lkb);
+		}
+
+		unlock_rsb(r);
+		unhold_rsb(r);
+		dlm_put_lkb(lkb);
+	}
+}
+
+/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
+   dlm_recoverd before checking/setting ls_recover_begin. */
+
+void dlm_adjust_timeouts(struct dlm_ls *ls)
+{
+	struct dlm_lkb *lkb;
+	long adj = jiffies - ls->ls_recover_begin;
+
+	ls->ls_recover_begin = 0;
+	mutex_lock(&ls->ls_timeout_mutex);
+	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
+		lkb->lkb_timestamp += adj;
+	mutex_unlock(&ls->ls_timeout_mutex);
+}
+
 /* lkb is master or local copy */
 
 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
@@ -1902,6 +2034,9 @@
 		if (is_overlap(lkb))
 			goto out;
 
+		/* don't let scand try to do a cancel */
+		del_timeout(lkb);
+
 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
 			rv = -EBUSY;
@@ -1933,6 +2068,9 @@
 		if (is_overlap_unlock(lkb))
 			goto out;
 
+		/* don't let scand try to do a cancel */
+		del_timeout(lkb);
+
 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
 			rv = -EBUSY;
@@ -1993,6 +2131,7 @@
 		error = -EINPROGRESS;
 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
 		send_blocking_asts(r, lkb);
+		add_timeout(lkb);
 		goto out;
 	}
 
@@ -2040,6 +2179,7 @@
 		del_lkb(r, lkb);
 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
 		send_blocking_asts(r, lkb);
+		add_timeout(lkb);
 		goto out;
 	}
 
@@ -3110,9 +3250,10 @@
 		lkb->lkb_remid = ms->m_lkid;
 		if (is_altmode(lkb))
 			munge_altmode(lkb, ms);
-		if (result)
+		if (result) {
 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
-		else {
+			add_timeout(lkb);
+		} else {
 			grant_lock_pc(r, lkb, ms);
 			queue_cast(r, lkb, 0);
 		}
@@ -3178,6 +3319,7 @@
 			munge_demoted(lkb, ms);
 		del_lkb(r, lkb);
 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
+		add_timeout(lkb);
 		break;
 
 	case 0: