rds: use RCU to synchronize work-enqueue with connection teardown

rds_sendmsg() can enqueue work on cp_send_w from process context, but
it should not enqueue this work if connection teardown  has commenced
(else we risk enquing work after rds_conn_path_destroy() has assumed that
all work has been cancelled/flushed).

Similarly some other functions like rds_cong_queue_updates
and rds_tcp_data_ready are called in softirq context, and may end
up enqueuing work on rds_wq after rds_conn_path_destroy() has assumed
that all workqs are quiesced.

Check the RDS_DESTROY_PENDING bit and use rcu synchronization to avoid
all these races.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/rds/threads.c b/net/rds/threads.c
index f121daa..eb76db1 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -87,8 +87,12 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
 
 	cp->cp_reconnect_jiffies = 0;
 	set_bit(0, &cp->cp_conn->c_map_queued);
-	queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
-	queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+	rcu_read_lock();
+	if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags)) {
+		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+		queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+	}
+	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_connect_path_complete);
 
@@ -133,7 +137,10 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
 	set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
 	if (cp->cp_reconnect_jiffies == 0) {
 		cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
-		queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+		rcu_read_lock();
+		if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+			queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
+		rcu_read_unlock();
 		return;
 	}
 
@@ -141,8 +148,11 @@ void rds_queue_reconnect(struct rds_conn_path *cp)
 	rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
 		 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
 		 conn, &conn->c_laddr, &conn->c_faddr);
-	queue_delayed_work(rds_wq, &cp->cp_conn_w,
-			   rand % cp->cp_reconnect_jiffies);
+	rcu_read_lock();
+	if (!test_bit(RDS_DESTROY_PENDING, &cp->cp_flags))
+		queue_delayed_work(rds_wq, &cp->cp_conn_w,
+				   rand % cp->cp_reconnect_jiffies);
+	rcu_read_unlock();
 
 	cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
 					rds_sysctl_reconnect_max_jiffies);