[DLM] block dlm_recv in recovery transition
Introduce a per-lockspace rwsem that's held in read mode by dlm_recv
threads while working in the dlm. This allows dlm_recv activity to be
suspended when the lockspace transitions to, from and between recovery
cycles.
The specific bug prompting this change is one where an in-progress
recovery cycle is aborted by a new recovery cycle. While dlm_recv was
processing a recovery message, the recovery cycle was aborted and
dlm_recoverd began cleaning up. dlm_recv decremented recover_locks_count
on an rsb after dlm_recoverd had reset it to zero. This is fixed by
suspending dlm_recv (taking write lock on the rwsem) before aborting the
current recovery.
The transitions to/from normal and recovery modes are simplified by using
this new ability to block dlm_recv. The switch from normal to recovery
mode means dlm_recv goes from processing locking messages, to saving them
for later, and vice versa. Races are avoided by blocking dlm_recv when
setting the flag that switches between modes.
Signed-off-by: David Teigland <teigland@redhat.com>
Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 6657599..4b89e20 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -24,19 +24,28 @@
/* If the start for which we're re-enabling locking (seq) has been superseded
- by a newer stop (ls_recover_seq), we need to leave locking disabled. */
+ by a newer stop (ls_recover_seq), we need to leave locking disabled.
+
+ We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
+ locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
+ enables locking and clears the requestqueue between a and b. */
static int enable_locking(struct dlm_ls *ls, uint64_t seq)
{
int error = -EINTR;
+ down_write(&ls->ls_recv_active);
+
spin_lock(&ls->ls_recover_lock);
if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags);
+ /* unblocks processes waiting to enter the dlm */
up_write(&ls->ls_in_recovery);
error = 0;
}
spin_unlock(&ls->ls_recover_lock);
+
+ up_write(&ls->ls_recv_active);
return error;
}