dlm: record full callback state

Change how callbacks are recorded for locks.  Previously, information
about multiple callbacks was combined into a couple of variables that
indicated what the end result should be.  In some situations, we
could not tell from this combined state what the exact sequence of
callbacks were, and would end up either delivering the callbacks in
the wrong order, or suppress redundant callbacks incorrectly.  This
new approach records all the data for each callback, leaving no
uncertainty about what needs to be delivered.

Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 4314f0d..abc49f2 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -18,6 +18,7 @@
 
 #define WAKE_ASTS  0
 
+static uint64_t			ast_seq_count;
 static struct list_head		ast_queue;
 static spinlock_t		ast_queue_lock;
 static struct task_struct *	astd_task;
@@ -25,40 +26,186 @@
 static struct mutex		astd_running;
 
 
+static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
+{
+	int i;
+
+	log_print("last_bast %x %llu flags %x mode %d sb %d %x",
+		  lkb->lkb_id,
+		  (unsigned long long)lkb->lkb_last_bast.seq,
+		  lkb->lkb_last_bast.flags,
+		  lkb->lkb_last_bast.mode,
+		  lkb->lkb_last_bast.sb_status,
+		  lkb->lkb_last_bast.sb_flags);
+
+	log_print("last_cast %x %llu flags %x mode %d sb %d %x",
+		  lkb->lkb_id,
+		  (unsigned long long)lkb->lkb_last_cast.seq,
+		  lkb->lkb_last_cast.flags,
+		  lkb->lkb_last_cast.mode,
+		  lkb->lkb_last_cast.sb_status,
+		  lkb->lkb_last_cast.sb_flags);
+
+	for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
+		log_print("cb %x %llu flags %x mode %d sb %d %x",
+			  lkb->lkb_id,
+			  (unsigned long long)lkb->lkb_callbacks[i].seq,
+			  lkb->lkb_callbacks[i].flags,
+			  lkb->lkb_callbacks[i].mode,
+			  lkb->lkb_callbacks[i].sb_status,
+			  lkb->lkb_callbacks[i].sb_flags);
+	}
+}
+
 void dlm_del_ast(struct dlm_lkb *lkb)
 {
 	spin_lock(&ast_queue_lock);
-	if (lkb->lkb_ast_type & (AST_COMP | AST_BAST))
-		list_del(&lkb->lkb_astqueue);
+	if (!list_empty(&lkb->lkb_astqueue))
+		list_del_init(&lkb->lkb_astqueue);
 	spin_unlock(&ast_queue_lock);
 }
 
-void dlm_add_ast(struct dlm_lkb *lkb, int type, int mode)
+int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
+			 int status, uint32_t sbflags, uint64_t seq)
 {
+	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
+	uint64_t prev_seq;
+	int prev_mode;
+	int i;
+
+	for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
+		if (lkb->lkb_callbacks[i].seq)
+			continue;
+
+		/*
+		 * Suppress some redundant basts here, do more on removal.
+		 * Don't even add a bast if the callback just before it
+		 * is a bast for the same mode or a more restrictive mode.
+		 * (the addional > PR check is needed for PR/CW inversion)
+		 */
+
+		if ((i > 0) && (flags & DLM_CB_BAST) &&
+		    (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) {
+
+			prev_seq = lkb->lkb_callbacks[i-1].seq;
+			prev_mode = lkb->lkb_callbacks[i-1].mode;
+
+			if ((prev_mode == mode) ||
+			    (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
+
+				log_debug(ls, "skip %x add bast %llu mode %d "
+					  "for bast %llu mode %d",
+					  lkb->lkb_id,
+					  (unsigned long long)seq,
+					  mode,
+					  (unsigned long long)prev_seq,
+					  prev_mode);
+				return 0;
+			}
+		}
+
+		lkb->lkb_callbacks[i].seq = seq;
+		lkb->lkb_callbacks[i].flags = flags;
+		lkb->lkb_callbacks[i].mode = mode;
+		lkb->lkb_callbacks[i].sb_status = status;
+		lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF);
+		break;
+	}
+
+	if (i == DLM_CALLBACKS_SIZE) {
+		log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x",
+			  lkb->lkb_id, (unsigned long long)seq,
+			  flags, mode, status, sbflags);
+		dlm_dump_lkb_callbacks(lkb);
+		return -1;
+	}
+
+	return 0;
+}
+
+int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb,
+			 struct dlm_callback *cb, int *resid)
+{
+	int i;
+
+	*resid = 0;
+
+	if (!lkb->lkb_callbacks[0].seq)
+		return -ENOENT;
+
+	/* oldest undelivered cb is callbacks[0] */
+
+	memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback));
+	memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback));
+
+	/* shift others down */
+
+	for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
+		if (!lkb->lkb_callbacks[i].seq)
+			break;
+		memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
+		       sizeof(struct dlm_callback));
+		memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
+		(*resid)++;
+	}
+
+	/* if cb is a bast, it should be skipped if the blocking mode is
+	   compatible with the last granted mode */
+
+	if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
+		if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
+			cb->flags |= DLM_CB_SKIP;
+
+			log_debug(ls, "skip %x bast %llu mode %d "
+				  "for cast %llu mode %d",
+				  lkb->lkb_id,
+				  (unsigned long long)cb->seq,
+				  cb->mode,
+				  (unsigned long long)lkb->lkb_last_cast.seq,
+				  lkb->lkb_last_cast.mode);
+			return 0;
+		}
+	}
+
+	if (cb->flags & DLM_CB_CAST) {
+		memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
+		lkb->lkb_last_cast_time = ktime_get();
+	}
+
+	if (cb->flags & DLM_CB_BAST) {
+		memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
+		lkb->lkb_last_bast_time = ktime_get();
+	}
+
+	return 0;
+}
+
+void dlm_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
+		 uint32_t sbflags)
+{
+	uint64_t seq;
+	int rv;
+
+	spin_lock(&ast_queue_lock);
+
+	seq = ++ast_seq_count;
+
 	if (lkb->lkb_flags & DLM_IFL_USER) {
-		dlm_user_add_ast(lkb, type, mode);
+		spin_unlock(&ast_queue_lock);
+		dlm_user_add_ast(lkb, flags, mode, status, sbflags, seq);
 		return;
 	}
 
-	spin_lock(&ast_queue_lock);
-	if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
-		kref_get(&lkb->lkb_ref);
-		list_add_tail(&lkb->lkb_astqueue, &ast_queue);
-		lkb->lkb_ast_first = type;
+	rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
+	if (rv < 0) {
+		spin_unlock(&ast_queue_lock);
+		return;
 	}
 
-	/* sanity check, this should not happen */
-
-	if ((type == AST_COMP) && (lkb->lkb_ast_type & AST_COMP))
-		log_print("repeat cast %d castmode %d lock %x %s",
-			  mode, lkb->lkb_castmode,
-			  lkb->lkb_id, lkb->lkb_resource->res_name);
-
-	lkb->lkb_ast_type |= type;
-	if (type == AST_BAST)
-		lkb->lkb_bastmode = mode;
-	else
-		lkb->lkb_castmode = mode;
+	if (list_empty(&lkb->lkb_astqueue)) {
+		kref_get(&lkb->lkb_ref);
+		list_add_tail(&lkb->lkb_astqueue, &ast_queue);
+	}
 	spin_unlock(&ast_queue_lock);
 
 	set_bit(WAKE_ASTS, &astd_wakeflags);
@@ -72,7 +219,8 @@
 	struct dlm_lkb *lkb;
 	void (*castfn) (void *astparam);
 	void (*bastfn) (void *astparam, int mode);
-	int type, first, bastmode, castmode, do_bast, do_cast, last_castmode;
+	struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
+	int i, rv, resid;
 
 repeat:
 	spin_lock(&ast_queue_lock);
@@ -83,54 +231,45 @@
 		if (dlm_locking_stopped(ls))
 			continue;
 
-		list_del(&lkb->lkb_astqueue);
-		type = lkb->lkb_ast_type;
-		lkb->lkb_ast_type = 0;
-		first = lkb->lkb_ast_first;
-		lkb->lkb_ast_first = 0;
-		bastmode = lkb->lkb_bastmode;
-		castmode = lkb->lkb_castmode;
+		/* we remove from astqueue list and remove everything in
+		   lkb_callbacks before releasing the spinlock so empty
+		   lkb_astqueue is always consistent with empty lkb_callbacks */
+
+		list_del_init(&lkb->lkb_astqueue);
+
 		castfn = lkb->lkb_astfn;
 		bastfn = lkb->lkb_bastfn;
+
+		memset(&callbacks, 0, sizeof(callbacks));
+
+		for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
+			rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
+			if (rv < 0)
+				break;
+		}
 		spin_unlock(&ast_queue_lock);
 
-		do_cast = (type & AST_COMP) && castfn;
-		do_bast = (type & AST_BAST) && bastfn;
-
-		/* Skip a bast if its blocking mode is compatible with the
-		   granted mode of the preceding cast. */
-
-		if (do_bast) {
-			if (first == AST_COMP)
-				last_castmode = castmode;
-			else
-				last_castmode = lkb->lkb_castmode_done;
-			if (dlm_modes_compat(bastmode, last_castmode))
-				do_bast = 0;
+		if (resid) {
+			/* shouldn't happen, for loop should have removed all */
+			log_error(ls, "callback resid %d lkb %x",
+				  resid, lkb->lkb_id);
 		}
 
-		if (first == AST_COMP) {
-			if (do_cast)
+		for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
+			if (!callbacks[i].seq)
+				break;
+			if (callbacks[i].flags & DLM_CB_SKIP) {
+				continue;
+			} else if (callbacks[i].flags & DLM_CB_BAST) {
+				bastfn(lkb->lkb_astparam, callbacks[i].mode);
+			} else if (callbacks[i].flags & DLM_CB_CAST) {
+				lkb->lkb_lksb->sb_status = callbacks[i].sb_status;
+				lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags;
 				castfn(lkb->lkb_astparam);
-			if (do_bast)
-				bastfn(lkb->lkb_astparam, bastmode);
-		} else if (first == AST_BAST) {
-			if (do_bast)
-				bastfn(lkb->lkb_astparam, bastmode);
-			if (do_cast)
-				castfn(lkb->lkb_astparam);
-		} else {
-			log_error(ls, "bad ast_first %d ast_type %d",
-				  first, type);
+			}
 		}
 
-		if (do_cast)
-			lkb->lkb_castmode_done = castmode;
-		if (do_bast)
-			lkb->lkb_bastmode_done = bastmode;
-
-		/* this removes the reference added by dlm_add_ast
-		   and may result in the lkb being freed */
+		/* removes ref for ast_queue, may cause lkb to be freed */
 		dlm_put_lkb(lkb);
 
 		cond_resched();