dlm: delayed reply message warning

Add an option (disabled by default) to print a warning message
when a lock has been waiting a configurable amount of time for
a reply message from another node.  This is mainly for debugging.

Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 04b8c44..e3c8641 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -799,10 +799,84 @@
 	return -1;
 }
 
+static int nodeid_warned(int nodeid, int num_nodes, int *warned)
+{
+	int i;
+
+	for (i = 0; i < num_nodes; i++) {
+		if (!warned[i]) {
+			warned[i] = nodeid;
+			return 0;
+		}
+		if (warned[i] == nodeid)
+			return 1;
+	}
+	return 0;
+}
+
+void dlm_scan_waiters(struct dlm_ls *ls)
+{
+	struct dlm_lkb *lkb;
+	ktime_t zero = ktime_set(0, 0);
+	s64 us;
+	s64 debug_maxus = 0;
+	u32 debug_scanned = 0;
+	u32 debug_expired = 0;
+	int num_nodes = 0;
+	int *warned = NULL;
+
+	if (!dlm_config.ci_waitwarn_us)
+		return;
+
+	mutex_lock(&ls->ls_waiters_mutex);
+
+	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+		if (ktime_equal(lkb->lkb_wait_time, zero))
+			continue;
+
+		debug_scanned++;
+
+		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
+
+		if (us < dlm_config.ci_waitwarn_us)
+			continue;
+
+		lkb->lkb_wait_time = zero;
+
+		debug_expired++;
+		if (us > debug_maxus)
+			debug_maxus = us;
+
+		if (!num_nodes) {
+			num_nodes = ls->ls_num_nodes;
+			warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
+			if (warned)
+				memset(warned, 0, num_nodes * sizeof(int));
+		}
+		if (!warned)
+			continue;
+		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
+			continue;
+
+		log_error(ls, "waitwarn %x %lld %d us check connection to "
+			  "node %d", lkb->lkb_id, (long long)us,
+			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
+	}
+	mutex_unlock(&ls->ls_waiters_mutex);
+
+	if (warned)
+		kfree(warned);
+
+	if (debug_expired)
+		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
+			  debug_scanned, debug_expired,
+			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
+}
+
 /* add/remove lkb from global waiters list of lkb's waiting for
    a reply from a remote node */
 
-static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
+static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
 {
 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
 	int error = 0;
@@ -842,6 +916,8 @@
 
 	lkb->lkb_wait_count++;
 	lkb->lkb_wait_type = mstype;
+	lkb->lkb_wait_time = ktime_get();
+	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
 	hold_lkb(lkb);
 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
  out:
@@ -1157,6 +1233,16 @@
 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
 	mutex_unlock(&ls->ls_timeout_mutex);
+
+	if (!dlm_config.ci_waitwarn_us)
+		return;
+
+	mutex_lock(&ls->ls_waiters_mutex);
+	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
+		if (ktime_to_us(lkb->lkb_wait_time))
+			lkb->lkb_wait_time = ktime_get();
+	}
+	mutex_unlock(&ls->ls_waiters_mutex);
 }
 
 /* lkb is master or local copy */
@@ -2844,12 +2930,12 @@
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	error = add_to_waiters(lkb, mstype);
+	to_nodeid = r->res_nodeid;
+
+	error = add_to_waiters(lkb, mstype, to_nodeid);
 	if (error)
 		return error;
 
-	to_nodeid = r->res_nodeid;
-
 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
 	if (error)
 		goto fail;
@@ -2951,12 +3037,12 @@
 	struct dlm_mhandle *mh;
 	int to_nodeid, error;
 
-	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
+	to_nodeid = dlm_dir_nodeid(r);
+
+	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
 	if (error)
 		return error;
 
-	to_nodeid = dlm_dir_nodeid(r);
-
 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
 	if (error)
 		goto fail;