gfs2: Clean up glock work enqueuing
This patch adds a standardized queueing mechanism for glock work
with spin_lock protection to prevent races.
Signed-off-by: Andreas Gruenbacher <agruenba@redhat.com>
Signed-off-by: Bob Peterson <rpeterso@redhat.com>
diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c
index 959a19c..6cd71c5 100644
--- a/fs/gfs2/glock.c
+++ b/fs/gfs2/glock.c
@@ -152,20 +152,34 @@
spin_unlock(&lru_lock);
}
-/**
- * gfs2_glock_put() - Decrement reference count on glock
- * @gl: The glock to put
- *
+/*
+ * Enqueue the glock on the work queue. Passes one glock reference on to the
+ * work queue.
*/
+static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) {
+ if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) {
+ /*
+ * We are holding the lockref spinlock, and the work was still
+ * queued above. The queued work (glock_work_func) takes that
+ * spinlock before dropping its glock reference(s), so it
+ * cannot have dropped them in the meantime.
+ */
+ GLOCK_BUG_ON(gl, gl->gl_lockref.count < 2);
+ gl->gl_lockref.count--;
+ }
+}
-void gfs2_glock_put(struct gfs2_glock *gl)
+static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) {
+ spin_lock(&gl->gl_lockref.lock);
+ __gfs2_glock_queue_work(gl, delay);
+ spin_unlock(&gl->gl_lockref.lock);
+}
+
+static void __gfs2_glock_put(struct gfs2_glock *gl)
{
struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
struct address_space *mapping = gfs2_glock2aspace(gl);
- if (lockref_put_or_lock(&gl->gl_lockref))
- return;
-
lockref_mark_dead(&gl->gl_lockref);
gfs2_glock_remove_from_lru(gl);
@@ -178,6 +192,20 @@
}
/**
+ * gfs2_glock_put() - Decrement reference count on glock
+ * @gl: The glock to put
+ *
+ */
+
+void gfs2_glock_put(struct gfs2_glock *gl)
+{
+ if (lockref_put_or_lock(&gl->gl_lockref))
+ return;
+
+ __gfs2_glock_put(gl);
+}
+
+/**
* may_grant - check if its ok to grant a new lock
* @gl: The glock
* @gh: The lock request which we wish to grant
@@ -482,8 +510,7 @@
target == LM_ST_UNLOCKED &&
test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) {
finish_xmote(gl, target);
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
- gfs2_glock_put(gl);
+ gfs2_glock_queue_work(gl, 0);
}
else if (ret) {
pr_err("lm_lock ret %d\n", ret);
@@ -492,8 +519,7 @@
}
} else { /* lock_nolock */
finish_xmote(gl, target);
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
- gfs2_glock_put(gl);
+ gfs2_glock_queue_work(gl, 0);
}
spin_lock(&gl->gl_lockref.lock);
@@ -565,8 +591,7 @@
clear_bit(GLF_LOCK, &gl->gl_flags);
smp_mb__after_atomic();
gl->gl_lockref.count++;
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
- gl->gl_lockref.count--;
+ __gfs2_glock_queue_work(gl, 0);
return;
out_unlock:
@@ -601,11 +626,11 @@
{
unsigned long delay = 0;
struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work);
- int drop_ref = 0;
+ unsigned int drop_refs = 1;
if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) {
finish_xmote(gl, gl->gl_reply);
- drop_ref = 1;
+ drop_refs++;
}
spin_lock(&gl->gl_lockref.lock);
if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
@@ -623,17 +648,25 @@
}
}
run_queue(gl, 0);
- spin_unlock(&gl->gl_lockref.lock);
- if (!delay)
- gfs2_glock_put(gl);
- else {
+ if (delay) {
+ /* Keep one glock reference for the work we requeue. */
+ drop_refs--;
if (gl->gl_name.ln_type != LM_TYPE_INODE)
delay = 0;
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0)
- gfs2_glock_put(gl);
+ __gfs2_glock_queue_work(gl, delay);
}
- if (drop_ref)
- gfs2_glock_put(gl);
+
+ /*
+ * Drop the remaining glock references manually here. (Mind that
+ * __gfs2_glock_queue_work depends on the lockref spinlock begin held
+ * here as well.)
+ */
+ gl->gl_lockref.count -= drop_refs;
+ if (!gl->gl_lockref.count) {
+ __gfs2_glock_put(gl);
+ return;
+ }
+ spin_unlock(&gl->gl_lockref.lock);
}
/**
@@ -986,8 +1019,7 @@
test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) {
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
gl->gl_lockref.count++;
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
- gl->gl_lockref.count--;
+ __gfs2_glock_queue_work(gl, 0);
}
run_queue(gl, 1);
spin_unlock(&gl->gl_lockref.lock);
@@ -1047,17 +1079,15 @@
gfs2_glock_add_to_lru(gl);
trace_gfs2_glock_queue(gh, 0);
+ if (unlikely(!fast_path)) {
+ gl->gl_lockref.count++;
+ if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
+ !test_bit(GLF_DEMOTE, &gl->gl_flags) &&
+ gl->gl_name.ln_type == LM_TYPE_INODE)
+ delay = gl->gl_hold_time;
+ __gfs2_glock_queue_work(gl, delay);
+ }
spin_unlock(&gl->gl_lockref.lock);
- if (likely(fast_path))
- return;
-
- gfs2_glock_hold(gl);
- if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
- !test_bit(GLF_DEMOTE, &gl->gl_flags) &&
- gl->gl_name.ln_type == LM_TYPE_INODE)
- delay = gl->gl_hold_time;
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0)
- gfs2_glock_put(gl);
}
void gfs2_glock_dq_wait(struct gfs2_holder *gh)
@@ -1233,9 +1263,8 @@
spin_lock(&gl->gl_lockref.lock);
handle_callback(gl, state, delay, true);
+ __gfs2_glock_queue_work(gl, delay);
spin_unlock(&gl->gl_lockref.lock);
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0)
- gfs2_glock_put(gl);
}
/**
@@ -1294,10 +1323,8 @@
gl->gl_lockref.count++;
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+ __gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
-
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
- gfs2_glock_put(gl);
}
static int glock_cmp(void *priv, struct list_head *a, struct list_head *b)
@@ -1355,8 +1382,7 @@
if (demote_ok(gl))
handle_callback(gl, LM_ST_UNLOCKED, 0, false);
WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags));
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
- gl->gl_lockref.count--;
+ __gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
cond_resched_lock(&lru_lock);
}
@@ -1462,13 +1488,12 @@
static void thaw_glock(struct gfs2_glock *gl)
{
- if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))
- goto out;
- set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) {
-out:
+ if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) {
gfs2_glock_put(gl);
+ return;
}
+ set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+ gfs2_glock_queue_work(gl, 0);
}
/**
@@ -1484,9 +1509,8 @@
spin_lock(&gl->gl_lockref.lock);
if (gl->gl_state != LM_ST_UNLOCKED)
handle_callback(gl, LM_ST_UNLOCKED, 0, false);
+ __gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock);
- if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
- gfs2_glock_put(gl);
}
/**