libceph: take map_sem for read in handle_reply()

Handling redirect replies requires both map_sem and request_mutex.
Taking map_sem unconditionally near the top of handle_reply() avoids
possible race conditions that arise from releasing request_mutex to be
able to acquire map_sem in redirect reply case.  (Lock ordering is:
map_sem, request_mutex, crush_mutex.)

Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2aa82b6..0676f2b 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1687,6 +1687,7 @@
 	osdmap_epoch = ceph_decode_32(&p);
 
 	/* lookup */
+	down_read(&osdc->map_sem);
 	mutex_lock(&osdc->request_mutex);
 	req = __lookup_request(osdc, tid);
 	if (req == NULL) {
@@ -1743,7 +1744,6 @@
 		dout("redirect pool %lld\n", redir.oloc.pool);
 
 		__unregister_request(osdc, req);
-		mutex_unlock(&osdc->request_mutex);
 
 		req->r_target_oloc = redir.oloc; /* struct */
 
@@ -1755,10 +1755,10 @@
 		 * successfully.  In the future we might want to follow
 		 * original request's nofail setting here.
 		 */
-		err = ceph_osdc_start_request(osdc, req, true);
+		err = __ceph_osdc_start_request(osdc, req, true);
 		BUG_ON(err);
 
-		goto done;
+		goto out_unlock;
 	}
 
 	already_completed = req->r_got_reply;
@@ -1776,8 +1776,7 @@
 		req->r_got_reply = 1;
 	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
 		dout("handle_reply tid %llu dup ack\n", tid);
-		mutex_unlock(&osdc->request_mutex);
-		goto done;
+		goto out_unlock;
 	}
 
 	dout("handle_reply tid %llu flags %d\n", tid, flags);
@@ -1792,6 +1791,7 @@
 		__unregister_request(osdc, req);
 
 	mutex_unlock(&osdc->request_mutex);
+	up_read(&osdc->map_sem);
 
 	if (!already_completed) {
 		if (req->r_unsafe_callback &&
@@ -1809,10 +1809,14 @@
 		complete_request(req);
 	}
 
-done:
+out:
 	dout("req=%p req->r_linger=%d\n", req, req->r_linger);
 	ceph_osdc_put_request(req);
 	return;
+out_unlock:
+	mutex_unlock(&osdc->request_mutex);
+	up_read(&osdc->map_sem);
+	goto out;
 
 bad_put:
 	req->r_result = -EIO;
@@ -1825,6 +1829,7 @@
 	ceph_osdc_put_request(req);
 bad_mutex:
 	mutex_unlock(&osdc->request_mutex);
+	up_read(&osdc->map_sem);
 bad:
 	pr_err("corrupt osd_op_reply got %d %d\n",
 	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));