libceph: protect osdc->osd_lru list with a spinlock

OSD client is getting moved from the big per-client lock to a set of
per-session locks.  The big rwlock would only be held for read most of
the time, so a global osdc->osd_lru needs additional protection.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2415dc0..486d681 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -224,6 +224,7 @@
 	struct mutex           request_mutex;
 	struct rb_root         osds;          /* osds */
 	struct list_head       osd_lru;       /* idle osds */
+	spinlock_t             osd_lru_lock;
 	u64                    last_tid;      /* tid of last request */
 	struct rb_root         requests;      /* pending requests */
 	struct list_head       req_lru;	      /* in-flight lru */
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index b6950c2..d1c8e06 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1101,31 +1101,37 @@
 	}
 }
 
-static void __move_osd_to_lru(struct ceph_osd_client *osdc,
-			      struct ceph_osd *osd)
+static void __move_osd_to_lru(struct ceph_osd *osd)
 {
-	dout("%s %p\n", __func__, osd);
+	struct ceph_osd_client *osdc = osd->o_osdc;
+
+	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
 	BUG_ON(!list_empty(&osd->o_osd_lru));
 
+	spin_lock(&osdc->osd_lru_lock);
 	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
+	spin_unlock(&osdc->osd_lru_lock);
+
 	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
 }
 
-static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
-				  struct ceph_osd *osd)
+static void maybe_move_osd_to_lru(struct ceph_osd *osd)
 {
-	dout("%s %p\n", __func__, osd);
-
 	if (list_empty(&osd->o_requests) &&
 	    list_empty(&osd->o_linger_requests))
-		__move_osd_to_lru(osdc, osd);
+		__move_osd_to_lru(osd);
 }
 
 static void __remove_osd_from_lru(struct ceph_osd *osd)
 {
-	dout("__remove_osd_from_lru %p\n", osd);
+	struct ceph_osd_client *osdc = osd->o_osdc;
+
+	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+	spin_lock(&osdc->osd_lru_lock);
 	if (!list_empty(&osd->o_osd_lru))
 		list_del_init(&osd->o_osd_lru);
+	spin_unlock(&osdc->osd_lru_lock);
 }
 
 /*
@@ -1199,7 +1205,7 @@
 		ceph_msg_revoke(req->r_request);
 
 		list_del_init(&req->r_osd_item);
-		maybe_move_osd_to_lru(osdc, req->r_osd);
+		maybe_move_osd_to_lru(req->r_osd);
 		if (list_empty(&req->r_linger_osd_item))
 			req->r_osd = NULL;
 	}
@@ -1248,7 +1254,7 @@
 
 	if (req->r_osd) {
 		list_del_init(&req->r_linger_osd_item);
-		maybe_move_osd_to_lru(osdc, req->r_osd);
+		maybe_move_osd_to_lru(req->r_osd);
 		if (list_empty(&req->r_osd_item))
 			req->r_osd = NULL;
 	}
@@ -2792,6 +2798,7 @@
 	osdc->last_tid = 0;
 	osdc->osds = RB_ROOT;
 	INIT_LIST_HEAD(&osdc->osd_lru);
+	spin_lock_init(&osdc->osd_lru_lock);
 	osdc->requests = RB_ROOT;
 	INIT_LIST_HEAD(&osdc->req_lru);
 	INIT_LIST_HEAD(&osdc->req_unsent);