NTB: Remove reads across NTB

CPU reads across NTB are slow(er) and can hang the local system if an
ECC error is encountered on the remote.  To work around the need for a
read, have the remote side write its current position in the rx buffer
to the local system's buffer and use that to see if there is room when
transmitting.

Signed-off-by: Jon Mason <jon.mason@intel.com>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
diff --git a/drivers/ntb/ntb_transport.c b/drivers/ntb/ntb_transport.c
index 1bed1ba..69c58da 100644
--- a/drivers/ntb/ntb_transport.c
+++ b/drivers/ntb/ntb_transport.c
@@ -78,6 +78,10 @@
 	unsigned int flags;
 };
 
+struct ntb_rx_info {
+	unsigned int entry;
+};
+
 struct ntb_transport_qp {
 	struct ntb_transport *transport;
 	struct ntb_device *ndev;
@@ -87,13 +91,16 @@
 	bool qp_link;
 	u8 qp_num;	/* Only 64 QP's are allowed.  0-63 */
 
+	struct ntb_rx_info *rx_info;
+	struct ntb_rx_info *remote_rx_info;
+
 	void (*tx_handler) (struct ntb_transport_qp *qp, void *qp_data,
 			    void *data, int len);
 	struct list_head tx_free_q;
 	spinlock_t ntb_tx_free_q_lock;
-	void *tx_mw_begin;
-	void *tx_mw_end;
-	void *tx_offset;
+	void *tx_mw;
+	unsigned int tx_index;
+	unsigned int tx_max_entry;
 	unsigned int tx_max_frame;
 
 	void (*rx_handler) (struct ntb_transport_qp *qp, void *qp_data,
@@ -103,9 +110,9 @@
 	struct list_head rx_free_q;
 	spinlock_t ntb_rx_pend_q_lock;
 	spinlock_t ntb_rx_free_q_lock;
-	void *rx_buff_begin;
-	void *rx_buff_end;
-	void *rx_offset;
+	void *rx_buff;
+	unsigned int rx_index;
+	unsigned int rx_max_entry;
 	unsigned int rx_max_frame;
 
 	void (*event_handler) (void *data, int status);
@@ -394,11 +401,11 @@
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
 			       "rx_err_ver - \t%llu\n", qp->rx_err_ver);
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
-			       "rx_buff_begin - %p\n", qp->rx_buff_begin);
+			       "rx_buff - \t%p\n", qp->rx_buff);
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
-			       "rx_offset - \t%p\n", qp->rx_offset);
+			       "rx_index - \t%u\n", qp->rx_index);
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
-			       "rx_buff_end - \t%p\n", qp->rx_buff_end);
+			       "rx_max_entry - \t%u\n", qp->rx_max_entry);
 
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
 			       "tx_bytes - \t%llu\n", qp->tx_bytes);
@@ -407,11 +414,11 @@
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
 			       "tx_ring_full - \t%llu\n", qp->tx_ring_full);
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
-			       "tx_mw_begin - \t%p\n", qp->tx_mw_begin);
+			       "tx_mw - \t%p\n", qp->tx_mw);
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
-			       "tx_offset - \t%p\n", qp->tx_offset);
+			       "tx_index - \t%u\n", qp->tx_index);
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
-			       "tx_mw_end - \t%p\n", qp->tx_mw_end);
+			       "tx_max_entry - \t%u\n", qp->tx_max_entry);
 
 	out_offset += snprintf(buf + out_offset, out_count - out_offset,
 			       "\nQP Link %s\n", (qp->qp_link == NTB_LINK_UP) ?
@@ -465,7 +472,7 @@
 	struct ntb_transport_qp *qp = &nt->qps[qp_num];
 	unsigned int rx_size, num_qps_mw;
 	u8 mw_num = QP_TO_MW(qp_num);
-	void *offset;
+	unsigned int i;
 
 	WARN_ON(nt->mw[mw_num].virt_addr == 0);
 
@@ -474,18 +481,24 @@
 	else
 		num_qps_mw = nt->max_qps / NTB_NUM_MW;
 
-	rx_size = nt->mw[mw_num].size / num_qps_mw;
-	qp->rx_buff_begin = nt->mw[mw_num].virt_addr +
-			    (qp_num / NTB_NUM_MW * rx_size);
-	qp->rx_buff_end = qp->rx_buff_begin + rx_size;
-	qp->rx_offset = qp->rx_buff_begin;
+	rx_size = (unsigned int) nt->mw[mw_num].size / num_qps_mw;
+	qp->remote_rx_info = nt->mw[mw_num].virt_addr +
+			     (qp_num / NTB_NUM_MW * rx_size);
+	rx_size -= sizeof(struct ntb_rx_info);
+
+	qp->rx_buff = qp->remote_rx_info + sizeof(struct ntb_rx_info);
 	qp->rx_max_frame = min(transport_mtu, rx_size);
+	qp->rx_max_entry = rx_size / qp->rx_max_frame;
+	qp->rx_index = 0;
+
+	qp->remote_rx_info->entry = qp->rx_max_entry;
 
 	/* setup the hdr offsets with 0's */
-	for (offset = qp->rx_buff_begin + qp->rx_max_frame -
-		      sizeof(struct ntb_payload_header);
-	     offset < qp->rx_buff_end; offset += qp->rx_max_frame)
+	for (i = 0; i < qp->rx_max_entry; i++) {
+		void *offset = qp->rx_buff + qp->rx_max_frame * (i + 1) -
+			       sizeof(struct ntb_payload_header);
 		memset(offset, 0, sizeof(struct ntb_payload_header));
+	}
 
 	qp->rx_pkts = 0;
 	qp->tx_pkts = 0;
@@ -762,12 +775,15 @@
 	else
 		num_qps_mw = nt->max_qps / NTB_NUM_MW;
 
-	tx_size = ntb_get_mw_size(qp->ndev, mw_num) / num_qps_mw;
-	qp->tx_mw_begin = ntb_get_mw_vbase(nt->ndev, mw_num) +
-			  (qp_num / NTB_NUM_MW * tx_size);
-	qp->tx_mw_end = qp->tx_mw_begin + tx_size;
-	qp->tx_offset = qp->tx_mw_begin;
+	tx_size = (unsigned int) ntb_get_mw_size(qp->ndev, mw_num) / num_qps_mw;
+	qp->rx_info = ntb_get_mw_vbase(nt->ndev, mw_num) +
+		      (qp_num / NTB_NUM_MW * tx_size);
+	tx_size -= sizeof(struct ntb_rx_info);
+
+	qp->tx_mw = qp->rx_info + sizeof(struct ntb_rx_info);
 	qp->tx_max_frame = min(transport_mtu, tx_size);
+	qp->tx_max_entry = tx_size / qp->tx_max_frame;
+	qp->tx_index = 0;
 
 	if (nt->debugfs_dir) {
 		char debugfs_name[4];
@@ -894,21 +910,8 @@
 static void ntb_rx_copy_task(struct ntb_transport_qp *qp,
 			     struct ntb_queue_entry *entry, void *offset)
 {
-
-	struct ntb_payload_header *hdr;
-
-	BUG_ON(offset < qp->rx_buff_begin ||
-	       offset + qp->rx_max_frame >= qp->rx_buff_end);
-
-	hdr = offset + qp->rx_max_frame - sizeof(struct ntb_payload_header);
-	entry->len = hdr->len;
-
 	memcpy(entry->buf, offset, entry->len);
 
-	/* Ensure that the data is fully copied out before clearing the flag */
-	wmb();
-	hdr->flags = 0;
-
 	if (qp->rx_handler && qp->client_ready == NTB_LINK_UP)
 		qp->rx_handler(qp, qp->cb_data, entry->cb_data, entry->len);
 
@@ -921,10 +924,11 @@
 	struct ntb_queue_entry *entry;
 	void *offset;
 
+	offset = qp->rx_buff + qp->rx_max_frame * qp->rx_index;
+	hdr = offset + qp->rx_max_frame - sizeof(struct ntb_payload_header);
+
 	entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q);
 	if (!entry) {
-		hdr = offset + qp->rx_max_frame -
-		      sizeof(struct ntb_payload_header);
 		dev_dbg(&ntb_query_pdev(qp->ndev)->dev,
 			"no buffer - HDR ver %llu, len %d, flags %x\n",
 			hdr->ver, hdr->len, hdr->flags);
@@ -932,9 +936,6 @@
 		return -ENOMEM;
 	}
 
-	offset = qp->rx_offset;
-	hdr = offset + qp->rx_max_frame - sizeof(struct ntb_payload_header);
-
 	if (!(hdr->flags & DESC_DONE_FLAG)) {
 		ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry,
 			     &qp->rx_pend_q);
@@ -957,30 +958,20 @@
 
 		ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry,
 			     &qp->rx_pend_q);
-
-		/* Ensure that the data is fully copied out before clearing the
-		 * done flag
-		 */
-		wmb();
-		hdr->flags = 0;
 		goto out;
 	}
 
 	dev_dbg(&ntb_query_pdev(qp->ndev)->dev,
-		"rx offset %p, ver %llu - %d payload received, buf size %d\n",
-		qp->rx_offset, hdr->ver, hdr->len, entry->len);
+		"rx offset %u, ver %llu - %d payload received, buf size %d\n",
+		qp->rx_index, hdr->ver, hdr->len, entry->len);
 
-	if (hdr->len <= entry->len)
+	if (hdr->len <= entry->len) {
+		entry->len = hdr->len;
 		ntb_rx_copy_task(qp, entry, offset);
-	else {
+	} else {
 		ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry,
 			     &qp->rx_pend_q);
 
-		/* Ensure that the data is fully copied out before clearing the
-		 * done flag
-		 */
-		wmb();
-		hdr->flags = 0;
 		qp->rx_err_oflow++;
 		dev_dbg(&ntb_query_pdev(qp->ndev)->dev,
 			"RX overflow! Wanted %d got %d\n",
@@ -991,9 +982,13 @@
 	qp->rx_pkts++;
 
 out:
-	qp->rx_offset += qp->rx_max_frame;
-	if (qp->rx_offset + qp->rx_max_frame >= qp->rx_buff_end)
-		qp->rx_offset = qp->rx_buff_begin;
+	/* Ensure that the data is fully copied out before clearing the flag */
+	wmb();
+	hdr->flags = 0;
+	qp->rx_info->entry = qp->rx_index;
+
+	qp->rx_index++;
+	qp->rx_index %= qp->rx_max_entry;
 
 	return 0;
 }
@@ -1024,9 +1019,6 @@
 {
 	struct ntb_payload_header *hdr;
 
-	BUG_ON(offset < qp->tx_mw_begin ||
-	       offset + qp->tx_max_frame >= qp->tx_mw_end);
-
 	memcpy_toio(offset, entry->buf, entry->len);
 
 	hdr = offset + qp->tx_max_frame - sizeof(struct ntb_payload_header);
@@ -1057,16 +1049,14 @@
 static int ntb_process_tx(struct ntb_transport_qp *qp,
 			  struct ntb_queue_entry *entry)
 {
-	struct ntb_payload_header *hdr;
 	void *offset;
 
-	offset = qp->tx_offset;
-	hdr = offset + qp->tx_max_frame - sizeof(struct ntb_payload_header);
+	offset = qp->tx_mw + qp->tx_max_frame * qp->tx_index;
 
-	dev_dbg(&ntb_query_pdev(qp->ndev)->dev, "%lld - offset %p, tx %p, entry len %d flags %x buff %p\n",
-		qp->tx_pkts, offset, qp->tx_offset, entry->len, entry->flags,
+	dev_dbg(&ntb_query_pdev(qp->ndev)->dev, "%lld - offset %p, tx %u, entry len %d flags %x buff %p\n",
+		qp->tx_pkts, offset, qp->tx_index, entry->len, entry->flags,
 		entry->buf);
-	if (hdr->flags) {
+	if (qp->tx_index == qp->remote_rx_info->entry) {
 		qp->tx_ring_full++;
 		return -EAGAIN;
 	}
@@ -1082,9 +1072,8 @@
 
 	ntb_tx_copy_task(qp, entry, offset);
 
-	qp->tx_offset += qp->tx_max_frame;
-	if (qp->tx_offset + qp->tx_max_frame >= qp->tx_mw_end)
-		qp->tx_offset = qp->tx_mw_begin;
+	qp->tx_index++;
+	qp->tx_index %= qp->tx_max_entry;
 
 	qp->tx_pkts++;