dlm: record full callback state

Change how callbacks are recorded for locks.  Previously, information
about multiple callbacks was combined into a couple of variables that
indicated what the end result should be.  In some situations, we
could not tell from this combined state what the exact sequence of
callbacks were, and would end up either delivering the callbacks in
the wrong order, or suppress redundant callbacks incorrectly.  This
new approach records all the data for each callback, leaving no
uncertainty about what needs to be delivered.

Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index 66d6c16..d5ab3fe 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -24,6 +24,7 @@
 #include "lock.h"
 #include "lvb_table.h"
 #include "user.h"
+#include "ast.h"
 
 static const char name_prefix[] = "dlm";
 static const struct file_operations device_fops;
@@ -152,19 +153,16 @@
    not related to the lifetime of the lkb struct which is managed
    entirely by refcount. */
 
-static int lkb_is_endoflife(struct dlm_lkb *lkb, int sb_status, int type)
+static int lkb_is_endoflife(int mode, int status)
 {
-	switch (sb_status) {
+	switch (status) {
 	case -DLM_EUNLOCK:
 		return 1;
 	case -DLM_ECANCEL:
 	case -ETIMEDOUT:
 	case -EDEADLK:
-		if (lkb->lkb_grmode == DLM_LOCK_IV)
-			return 1;
-		break;
 	case -EAGAIN:
-		if (type == AST_COMP && lkb->lkb_grmode == DLM_LOCK_IV)
+		if (mode == DLM_LOCK_IV)
 			return 1;
 		break;
 	}
@@ -174,12 +172,13 @@
 /* we could possibly check if the cancel of an orphan has resulted in the lkb
    being removed and then remove that lkb from the orphans list and free it */
 
-void dlm_user_add_ast(struct dlm_lkb *lkb, int type, int mode)
+void dlm_user_add_ast(struct dlm_lkb *lkb, uint32_t flags, int mode,
+		      int status, uint32_t sbflags, uint64_t seq)
 {
 	struct dlm_ls *ls;
 	struct dlm_user_args *ua;
 	struct dlm_user_proc *proc;
-	int eol = 0, ast_type;
+	int rv;
 
 	if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD))
 		return;
@@ -200,49 +199,29 @@
 	ua = lkb->lkb_ua;
 	proc = ua->proc;
 
-	if (type == AST_BAST && ua->bastaddr == NULL)
+	if ((flags & DLM_CB_BAST) && ua->bastaddr == NULL)
 		goto out;
 
+	if ((flags & DLM_CB_CAST) && lkb_is_endoflife(mode, status))
+		lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
+
 	spin_lock(&proc->asts_spin);
 
-	ast_type = lkb->lkb_ast_type;
-	lkb->lkb_ast_type |= type;
-	if (type == AST_BAST)
-		lkb->lkb_bastmode = mode;
-	else
-		lkb->lkb_castmode = mode;
+	rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, seq);
+	if (rv < 0) {
+		spin_unlock(&proc->asts_spin);
+		goto out;
+	}
 
-	if (!ast_type) {
+	if (list_empty(&lkb->lkb_astqueue)) {
 		kref_get(&lkb->lkb_ref);
 		list_add_tail(&lkb->lkb_astqueue, &proc->asts);
-		lkb->lkb_ast_first = type;
 		wake_up_interruptible(&proc->wait);
 	}
-	if (type == AST_COMP && (ast_type & AST_COMP))
-		log_debug(ls, "ast overlap %x status %x %x",
-			  lkb->lkb_id, ua->lksb.sb_status, lkb->lkb_flags);
-
-	eol = lkb_is_endoflife(lkb, ua->lksb.sb_status, type);
-	if (eol) {
-		lkb->lkb_flags |= DLM_IFL_ENDOFLIFE;
-	}
-
-	/* We want to copy the lvb to userspace when the completion
-	   ast is read if the status is 0, the lock has an lvb and
-	   lvb_ops says we should.  We could probably have set_lvb_lock()
-	   set update_user_lvb instead and not need old_mode */
-
-	if ((lkb->lkb_ast_type & AST_COMP) &&
-	    (lkb->lkb_lksb->sb_status == 0) &&
-	    lkb->lkb_lksb->sb_lvbptr &&
-	    dlm_lvb_operations[ua->old_mode + 1][lkb->lkb_grmode + 1])
-		ua->update_user_lvb = 1;
-	else
-		ua->update_user_lvb = 0;
-
 	spin_unlock(&proc->asts_spin);
 
-	if (eol) {
+	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
+		/* N.B. spin_lock locks_spin, not asts_spin */
 		spin_lock(&proc->locks_spin);
 		if (!list_empty(&lkb->lkb_ownqueue)) {
 			list_del_init(&lkb->lkb_ownqueue);
@@ -705,8 +684,9 @@
 	return 0;
 }
 
-static int copy_result_to_user(struct dlm_user_args *ua, int compat, int type,
-			       int mode, char __user *buf, size_t count)
+static int copy_result_to_user(struct dlm_user_args *ua, int compat,
+			       uint32_t flags, int mode, int copy_lvb,
+			       char __user *buf, size_t count)
 {
 #ifdef CONFIG_COMPAT
 	struct dlm_lock_result32 result32;
@@ -730,7 +710,7 @@
 	   notes that a new blocking AST address and parameter are set even if
 	   the conversion fails, so maybe we should just do that. */
 
-	if (type == AST_BAST) {
+	if (flags & DLM_CB_BAST) {
 		result.user_astaddr = ua->bastaddr;
 		result.user_astparam = ua->bastparam;
 		result.bast_mode = mode;
@@ -750,8 +730,7 @@
 	/* copy lvb to userspace if there is one, it's been updated, and
 	   the user buffer has space for it */
 
-	if (ua->update_user_lvb && ua->lksb.sb_lvbptr &&
-	    count >= len + DLM_USER_LVB_LEN) {
+	if (copy_lvb && ua->lksb.sb_lvbptr && count >= len + DLM_USER_LVB_LEN) {
 		if (copy_to_user(buf+len, ua->lksb.sb_lvbptr,
 				 DLM_USER_LVB_LEN)) {
 			error = -EFAULT;
@@ -801,13 +780,12 @@
 	struct dlm_user_proc *proc = file->private_data;
 	struct dlm_lkb *lkb;
 	DECLARE_WAITQUEUE(wait, current);
-	int error = 0, removed;
-	int ret_type, ret_mode;
-	int bastmode, castmode, do_bast, do_cast;
+	struct dlm_callback cb;
+	int rv, resid, copy_lvb = 0;
 
 	if (count == sizeof(struct dlm_device_version)) {
-		error = copy_version_to_user(buf, count);
-		return error;
+		rv = copy_version_to_user(buf, count);
+		return rv;
 	}
 
 	if (!proc) {
@@ -854,92 +832,57 @@
 		}
 	}
 
-	/* there may be both completion and blocking asts to return for
-	   the lkb, don't remove lkb from asts list unless no asts remain */
+	/* if we empty lkb_callbacks, we don't want to unlock the spinlock
+	   without removing lkb_astqueue; so empty lkb_astqueue is always
+	   consistent with empty lkb_callbacks */
 
 	lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue);
 
-	removed = 0;
-	ret_type = 0;
-	ret_mode = 0;
-	do_bast = lkb->lkb_ast_type & AST_BAST;
-	do_cast = lkb->lkb_ast_type & AST_COMP;
-	bastmode = lkb->lkb_bastmode;
-	castmode = lkb->lkb_castmode;
-
-	/* when both are queued figure out which to do first and
-	   switch first so the other goes in the next read */
-
-	if (do_cast && do_bast) {
-		if (lkb->lkb_ast_first == AST_COMP) {
-			ret_type = AST_COMP;
-			ret_mode = castmode;
-			lkb->lkb_ast_type &= ~AST_COMP;
-			lkb->lkb_ast_first = AST_BAST;
-		} else {
-			ret_type = AST_BAST;
-			ret_mode = bastmode;
-			lkb->lkb_ast_type &= ~AST_BAST;
-			lkb->lkb_ast_first = AST_COMP;
-		}
-	} else {
-		ret_type = lkb->lkb_ast_first;
-		ret_mode = (ret_type == AST_COMP) ? castmode : bastmode;
-		lkb->lkb_ast_type &= ~ret_type;
-		lkb->lkb_ast_first = 0;
+	rv = dlm_rem_lkb_callback(lkb->lkb_resource->res_ls, lkb, &cb, &resid);
+	if (rv < 0) {
+		/* this shouldn't happen; lkb should have been removed from
+		   list when resid was zero */
+		log_print("dlm_rem_lkb_callback empty %x", lkb->lkb_id);
+		list_del_init(&lkb->lkb_astqueue);
+		spin_unlock(&proc->asts_spin);
+		/* removes ref for proc->asts, may cause lkb to be freed */
+		dlm_put_lkb(lkb);
+		goto try_another;
 	}
-
-	/* if we're doing a bast but the bast is unnecessary, then
-	   switch to do nothing or do a cast if that was needed next */
-
-	if ((ret_type == AST_BAST) &&
-	    dlm_modes_compat(bastmode, lkb->lkb_castmode_done)) {
-		ret_type = 0;
-		ret_mode = 0;
-
-		if (do_cast) {
-			ret_type = AST_COMP;
-			ret_mode = castmode;
-			lkb->lkb_ast_type &= ~AST_COMP;
-			lkb->lkb_ast_first = 0;
-		}
-	}
-
-	if (lkb->lkb_ast_first != lkb->lkb_ast_type) {
-		log_print("device_read %x ast_first %x ast_type %x",
-			  lkb->lkb_id, lkb->lkb_ast_first, lkb->lkb_ast_type);
-	}
-
-	if (!lkb->lkb_ast_type) {
-		list_del(&lkb->lkb_astqueue);
-		removed = 1;
-	}
+	if (!resid)
+		list_del_init(&lkb->lkb_astqueue);
 	spin_unlock(&proc->asts_spin);
 
-	if (ret_type) {
-		error = copy_result_to_user(lkb->lkb_ua,
-				test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
-				ret_type, ret_mode, buf, count);
-
-		if (ret_type == AST_COMP)
-			lkb->lkb_castmode_done = castmode;
-		if (ret_type == AST_BAST)
-			lkb->lkb_bastmode_done = bastmode;
+	if (cb.flags & DLM_CB_SKIP) {
+		/* removes ref for proc->asts, may cause lkb to be freed */
+		if (!resid)
+			dlm_put_lkb(lkb);
+		goto try_another;
 	}
 
-	/* removes reference for the proc->asts lists added by
-	   dlm_user_add_ast() and may result in the lkb being freed */
+	if (cb.flags & DLM_CB_CAST) {
+		int old_mode, new_mode;
 
-	if (removed)
+		old_mode = lkb->lkb_last_cast.mode;
+		new_mode = cb.mode;
+
+		if (!cb.sb_status && lkb->lkb_lksb->sb_lvbptr &&
+		    dlm_lvb_operations[old_mode + 1][new_mode + 1])
+			copy_lvb = 1;
+
+		lkb->lkb_lksb->sb_status = cb.sb_status;
+		lkb->lkb_lksb->sb_flags = cb.sb_flags;
+	}
+
+	rv = copy_result_to_user(lkb->lkb_ua,
+				 test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
+				 cb.flags, cb.mode, copy_lvb, buf, count);
+
+	/* removes ref for proc->asts, may cause lkb to be freed */
+	if (!resid)
 		dlm_put_lkb(lkb);
 
-	/* the bast that was queued was eliminated (see unnecessary above),
-	   leaving nothing to return */
-
-	if (!ret_type)
-		goto try_another;
-
-	return error;
+	return rv;
 }
 
 static unsigned int device_poll(struct file *file, poll_table *wait)