libceph: fix msgr standby handling
The standby logic used to be pretty dependent on the work requeueing
behavior that changed when we switched to WQ_NON_REENTRANT. It was also
very fragile.
Restructure things so that:
- We clear WRITE_PENDING when we set STANDBY. This ensures we will
requeue work when we wake up later.
- con_work backs off if STANDBY is set. There is nothing to do if we are
in standby.
- clear_standby() helper is called by both con_send() and con_keepalive(),
the two actions that can wake us up again. Move the connect_seq++
logic here.
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3252ea9..05f3578 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1712,14 +1712,6 @@
/* open the socket first? */
if (con->sock == NULL) {
- /*
- * if we were STANDBY and are reconnecting _this_
- * connection, bump connect_seq now. Always bump
- * global_seq.
- */
- if (test_and_clear_bit(STANDBY, &con->state))
- con->connect_seq++;
-
prepare_write_banner(msgr, con);
prepare_write_connect(msgr, con, 1);
prepare_read_banner(con);
@@ -1962,6 +1954,10 @@
}
}
+ if (test_bit(STANDBY, &con->state)) {
+ dout("con_work %p STANDBY\n", con);
+ goto done;
+ }
if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
dout("con_work CLOSED\n");
con_close_socket(con);
@@ -2022,6 +2018,8 @@
* the connection in a STANDBY state */
if (list_empty(&con->out_queue) &&
!test_bit(KEEPALIVE_PENDING, &con->state)) {
+ dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
+ clear_bit(WRITE_PENDING, &con->state);
set_bit(STANDBY, &con->state);
} else {
/* retry after a delay. */
@@ -2117,6 +2115,19 @@
}
EXPORT_SYMBOL(ceph_messenger_destroy);
+static void clear_standby(struct ceph_connection *con)
+{
+ /* come back from STANDBY? */
+ if (test_and_clear_bit(STANDBY, &con->state)) {
+ mutex_lock(&con->mutex);
+ dout("clear_standby %p and ++connect_seq\n", con);
+ con->connect_seq++;
+ WARN_ON(test_bit(WRITE_PENDING, &con->state));
+ WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
+ mutex_unlock(&con->mutex);
+ }
+}
+
/*
* Queue up an outgoing message on the given connection.
*/
@@ -2149,6 +2160,7 @@
/* if there wasn't anything waiting to send before, queue
* new work */
+ clear_standby(con);
if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
queue_con(con);
}
@@ -2214,6 +2226,8 @@
*/
void ceph_con_keepalive(struct ceph_connection *con)
{
+ dout("con_keepalive %p\n", con);
+ clear_standby(con);
if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
test_and_set_bit(WRITE_PENDING, &con->state) == 0)
queue_con(con);