RDMA/core: Export ib_open_qp() to share XRC TGT QPs

XRC TGT QPs are shared resources among multiple processes.  Since the
creating process may exit, allow other processes which share the same
XRC domain to open an existing QP.  This allows us to transfer
ownership of an XRC TGT QP to another process.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
Signed-off-by: Roland Dreier <roland@purestorage.com>
diff --git a/drivers/infiniband/core/uverbs_cmd.c b/drivers/infiniband/core/uverbs_cmd.c
index 9058e38..c4c308c 100644
--- a/drivers/infiniband/core/uverbs_cmd.c
+++ b/drivers/infiniband/core/uverbs_cmd.c
@@ -1463,6 +1463,7 @@
 	}
 
 	if (cmd.qp_type != IB_QPT_XRC_TGT) {
+		qp->real_qp	  = qp;
 		qp->device	  = device;
 		qp->pd		  = pd;
 		qp->send_cq	  = attr.send_cq;
@@ -1729,8 +1730,12 @@
 	attr->alt_ah_attr.ah_flags 	    = cmd.alt_dest.is_global ? IB_AH_GRH : 0;
 	attr->alt_ah_attr.port_num 	    = cmd.alt_dest.port_num;
 
-	ret = qp->device->modify_qp(qp, attr,
-				    modify_qp_mask(qp->qp_type, cmd.attr_mask), &udata);
+	if (qp->real_qp == qp) {
+		ret = qp->device->modify_qp(qp, attr,
+			modify_qp_mask(qp->qp_type, cmd.attr_mask), &udata);
+	} else {
+		ret = ib_modify_qp(qp, attr, modify_qp_mask(qp->qp_type, cmd.attr_mask));
+	}
 
 	put_qp_read(qp);
 
@@ -1927,7 +1932,7 @@
 	}
 
 	resp.bad_wr = 0;
-	ret = qp->device->post_send(qp, wr, &bad_wr);
+	ret = qp->device->post_send(qp->real_qp, wr, &bad_wr);
 	if (ret)
 		for (next = wr; next; next = next->next) {
 			++resp.bad_wr;
@@ -2065,7 +2070,7 @@
 		goto out;
 
 	resp.bad_wr = 0;
-	ret = qp->device->post_recv(qp, wr, &bad_wr);
+	ret = qp->device->post_recv(qp->real_qp, wr, &bad_wr);
 
 	put_qp_read(qp);
 
diff --git a/drivers/infiniband/core/uverbs_main.c b/drivers/infiniband/core/uverbs_main.c
index 0cb69e0..9c877e2 100644
--- a/drivers/infiniband/core/uverbs_main.c
+++ b/drivers/infiniband/core/uverbs_main.c
@@ -206,8 +206,8 @@
 			container_of(uobj, struct ib_uqp_object, uevent.uobject);
 
 		idr_remove_uobj(&ib_uverbs_qp_idr, uobj);
-		if (qp->qp_type == IB_QPT_XRC_TGT) {
-			ib_release_qp(qp);
+		if (qp != qp->real_qp) {
+			ib_close_qp(qp);
 		} else {
 			ib_uverbs_detach_umcast(qp, uqp);
 			ib_destroy_qp(qp);
diff --git a/drivers/infiniband/core/verbs.c b/drivers/infiniband/core/verbs.c
index a6d95e6..e02898bc 100644
--- a/drivers/infiniband/core/verbs.c
+++ b/drivers/infiniband/core/verbs.c
@@ -39,6 +39,7 @@
 #include <linux/errno.h>
 #include <linux/err.h>
 #include <linux/string.h>
+#include <linux/slab.h>
 
 #include <rdma/ib_verbs.h>
 #include <rdma/ib_cache.h>
@@ -316,6 +317,14 @@
 
 /* Queue pairs */
 
+static void __ib_shared_qp_event_handler(struct ib_event *event, void *context)
+{
+	struct ib_qp *qp = context;
+
+	list_for_each_entry(event->element.qp, &qp->open_list, open_list)
+		event->element.qp->event_handler(event, event->element.qp->qp_context);
+}
+
 static void __ib_insert_xrcd_qp(struct ib_xrcd *xrcd, struct ib_qp *qp)
 {
 	mutex_lock(&xrcd->tgt_qp_mutex);
@@ -323,33 +332,90 @@
 	mutex_unlock(&xrcd->tgt_qp_mutex);
 }
 
-static void __ib_remove_xrcd_qp(struct ib_xrcd *xrcd, struct ib_qp *qp)
+static struct ib_qp *__ib_open_qp(struct ib_qp *real_qp,
+				  void (*event_handler)(struct ib_event *, void *),
+				  void *qp_context)
 {
-	mutex_lock(&xrcd->tgt_qp_mutex);
-	list_del(&qp->xrcd_list);
-	mutex_unlock(&xrcd->tgt_qp_mutex);
+	struct ib_qp *qp;
+	unsigned long flags;
+
+	qp = kzalloc(sizeof *qp, GFP_KERNEL);
+	if (!qp)
+		return ERR_PTR(-ENOMEM);
+
+	qp->real_qp = real_qp;
+	atomic_inc(&real_qp->usecnt);
+	qp->device = real_qp->device;
+	qp->event_handler = event_handler;
+	qp->qp_context = qp_context;
+	qp->qp_num = real_qp->qp_num;
+	qp->qp_type = real_qp->qp_type;
+
+	spin_lock_irqsave(&real_qp->device->event_handler_lock, flags);
+	list_add(&qp->open_list, &real_qp->open_list);
+	spin_unlock_irqrestore(&real_qp->device->event_handler_lock, flags);
+
+	return qp;
 }
 
+struct ib_qp *ib_open_qp(struct ib_xrcd *xrcd,
+			 struct ib_qp_open_attr *qp_open_attr)
+{
+	struct ib_qp *qp, *real_qp;
+
+	if (qp_open_attr->qp_type != IB_QPT_XRC_TGT)
+		return ERR_PTR(-EINVAL);
+
+	qp = ERR_PTR(-EINVAL);
+	mutex_lock(&xrcd->tgt_qp_mutex);
+	list_for_each_entry(real_qp, &xrcd->tgt_qp_list, xrcd_list) {
+		if (real_qp->qp_num == qp_open_attr->qp_num) {
+			qp = __ib_open_qp(real_qp, qp_open_attr->event_handler,
+					  qp_open_attr->qp_context);
+			break;
+		}
+	}
+	mutex_unlock(&xrcd->tgt_qp_mutex);
+	return qp;
+}
+EXPORT_SYMBOL(ib_open_qp);
+
 struct ib_qp *ib_create_qp(struct ib_pd *pd,
 			   struct ib_qp_init_attr *qp_init_attr)
 {
-	struct ib_qp *qp;
+	struct ib_qp *qp, *real_qp;
 	struct ib_device *device;
 
 	device = pd ? pd->device : qp_init_attr->xrcd->device;
 	qp = device->create_qp(pd, qp_init_attr, NULL);
 
 	if (!IS_ERR(qp)) {
-		qp->device = device;
+		qp->device     = device;
+		qp->real_qp    = qp;
+		qp->uobject    = NULL;
+		qp->qp_type    = qp_init_attr->qp_type;
 
 		if (qp_init_attr->qp_type == IB_QPT_XRC_TGT) {
+			qp->event_handler = __ib_shared_qp_event_handler;
+			qp->qp_context = qp;
 			qp->pd = NULL;
 			qp->send_cq = qp->recv_cq = NULL;
 			qp->srq = NULL;
 			qp->xrcd = qp_init_attr->xrcd;
 			atomic_inc(&qp_init_attr->xrcd->usecnt);
-			__ib_insert_xrcd_qp(qp_init_attr->xrcd, qp);
+			INIT_LIST_HEAD(&qp->open_list);
+			atomic_set(&qp->usecnt, 0);
+
+			real_qp = qp;
+			qp = __ib_open_qp(real_qp, qp_init_attr->event_handler,
+					  qp_init_attr->qp_context);
+			if (!IS_ERR(qp))
+				__ib_insert_xrcd_qp(qp_init_attr->xrcd, real_qp);
+			else
+				real_qp->device->destroy_qp(real_qp);
 		} else {
+			qp->event_handler = qp_init_attr->event_handler;
+			qp->qp_context = qp_init_attr->qp_context;
 			if (qp_init_attr->qp_type == IB_QPT_XRC_INI) {
 				qp->recv_cq = NULL;
 				qp->srq = NULL;
@@ -368,11 +434,6 @@
 			atomic_inc(&pd->usecnt);
 			atomic_inc(&qp_init_attr->send_cq->usecnt);
 		}
-
-		qp->uobject       = NULL;
-		qp->event_handler = qp_init_attr->event_handler;
-		qp->qp_context    = qp_init_attr->qp_context;
-		qp->qp_type	  = qp_init_attr->qp_type;
 	}
 
 	return qp;
@@ -717,7 +778,7 @@
 		 struct ib_qp_attr *qp_attr,
 		 int qp_attr_mask)
 {
-	return qp->device->modify_qp(qp, qp_attr, qp_attr_mask, NULL);
+	return qp->device->modify_qp(qp->real_qp, qp_attr, qp_attr_mask, NULL);
 }
 EXPORT_SYMBOL(ib_modify_qp);
 
@@ -727,26 +788,76 @@
 		struct ib_qp_init_attr *qp_init_attr)
 {
 	return qp->device->query_qp ?
-		qp->device->query_qp(qp, qp_attr, qp_attr_mask, qp_init_attr) :
+		qp->device->query_qp(qp->real_qp, qp_attr, qp_attr_mask, qp_init_attr) :
 		-ENOSYS;
 }
 EXPORT_SYMBOL(ib_query_qp);
 
+int ib_close_qp(struct ib_qp *qp)
+{
+	struct ib_qp *real_qp;
+	unsigned long flags;
+
+	real_qp = qp->real_qp;
+	if (real_qp == qp)
+		return -EINVAL;
+
+	spin_lock_irqsave(&real_qp->device->event_handler_lock, flags);
+	list_del(&qp->open_list);
+	spin_unlock_irqrestore(&real_qp->device->event_handler_lock, flags);
+
+	atomic_dec(&real_qp->usecnt);
+	kfree(qp);
+
+	return 0;
+}
+EXPORT_SYMBOL(ib_close_qp);
+
+static int __ib_destroy_shared_qp(struct ib_qp *qp)
+{
+	struct ib_xrcd *xrcd;
+	struct ib_qp *real_qp;
+	int ret;
+
+	real_qp = qp->real_qp;
+	xrcd = real_qp->xrcd;
+
+	mutex_lock(&xrcd->tgt_qp_mutex);
+	ib_close_qp(qp);
+	if (atomic_read(&real_qp->usecnt) == 0)
+		list_del(&real_qp->xrcd_list);
+	else
+		real_qp = NULL;
+	mutex_unlock(&xrcd->tgt_qp_mutex);
+
+	if (real_qp) {
+		ret = ib_destroy_qp(real_qp);
+		if (!ret)
+			atomic_dec(&xrcd->usecnt);
+		else
+			__ib_insert_xrcd_qp(xrcd, real_qp);
+	}
+
+	return 0;
+}
+
 int ib_destroy_qp(struct ib_qp *qp)
 {
 	struct ib_pd *pd;
 	struct ib_cq *scq, *rcq;
 	struct ib_srq *srq;
-	struct ib_xrcd *xrcd;
 	int ret;
 
+	if (atomic_read(&qp->usecnt))
+		return -EBUSY;
+
+	if (qp->real_qp != qp)
+		return __ib_destroy_shared_qp(qp);
+
 	pd   = qp->pd;
 	scq  = qp->send_cq;
 	rcq  = qp->recv_cq;
 	srq  = qp->srq;
-	xrcd = qp->xrcd;
-	if (xrcd)
-		__ib_remove_xrcd_qp(xrcd, qp);
 
 	ret = qp->device->destroy_qp(qp);
 	if (!ret) {
@@ -758,32 +869,12 @@
 			atomic_dec(&rcq->usecnt);
 		if (srq)
 			atomic_dec(&srq->usecnt);
-		if (xrcd)
-			atomic_dec(&xrcd->usecnt);
-	} else if (xrcd) {
-		__ib_insert_xrcd_qp(xrcd, qp);
 	}
 
 	return ret;
 }
 EXPORT_SYMBOL(ib_destroy_qp);
 
-int ib_release_qp(struct ib_qp *qp)
-{
-	unsigned long flags;
-
-	if (qp->qp_type != IB_QPT_XRC_TGT)
-		return -EINVAL;
-
-	spin_lock_irqsave(&qp->device->event_handler_lock, flags);
-	qp->event_handler = NULL;
-	spin_unlock_irqrestore(&qp->device->event_handler_lock, flags);
-
-	atomic_dec(&qp->xrcd->usecnt);
-	return 0;
-}
-EXPORT_SYMBOL(ib_release_qp);
-
 /* Completion queues */
 
 struct ib_cq *ib_create_cq(struct ib_device *device,