ceph: fix osd request submission race

The osd request submission path registers the request, drops and retakes
the request_mutex, then sends it to the OSD.  A racing kick_requests could
sent it during that interval, causing the same msg to be sent twice and
BUGing in the msgr.

Fix by only sending the message if it hasn't been touched by other
threads.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 978593a..d14019d 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -837,7 +837,8 @@
 		}
 
 kick:
-		dout("kicking tid %llu osd%d\n", req->r_tid, req->r_osd->o_osd);
+		dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
+		     req->r_osd->o_osd);
 		req->r_flags |= CEPH_OSD_FLAG_RETRY;
 		err = __send_request(osdc, req);
 		if (err) {
@@ -1016,7 +1017,7 @@
 			    struct ceph_osd_request *req,
 			    bool nofail)
 {
-	int rc;
+	int rc = 0;
 
 	req->r_request->pages = req->r_pages;
 	req->r_request->nr_pages = req->r_num_pages;
@@ -1025,15 +1026,22 @@
 
 	down_read(&osdc->map_sem);
 	mutex_lock(&osdc->request_mutex);
-	rc = __send_request(osdc, req);
-	if (rc) {
-		if (nofail) {
-			dout("osdc_start_request failed send, marking %lld\n",
-			     req->r_tid);
-			req->r_resend = true;
-			rc = 0;
-		} else {
-			__unregister_request(osdc, req);
+	/*
+	 * a racing kick_requests() may have sent the message for us
+	 * while we dropped request_mutex above, so only send now if
+	 * the request still han't been touched yet.
+	 */
+	if (req->r_sent == 0) {
+		rc = __send_request(osdc, req);
+		if (rc) {
+			if (nofail) {
+				dout("osdc_start_request failed send, "
+				     " marking %lld\n", req->r_tid);
+				req->r_resend = true;
+				rc = 0;
+			} else {
+				__unregister_request(osdc, req);
+			}
 		}
 	}
 	mutex_unlock(&osdc->request_mutex);