drbd: remove struct drbd_tl_epoch objects (barrier works)

cherry-picked and adapted from drbd 9 devel branch

DRBD requests (struct drbd_request) are already on the per resource
transfer log list, and carry their epoch number. We do not need to
additionally link them on other ring lists in other structs.

The drbd sender thread can recognize itself when to send a P_BARRIER,
by tracking the currently processed epoch, and how many writes
have been processed for that epoch.

If the epoch of the request to be processed does not match the currently
processed epoch, any writes have been processed in it, a P_BARRIER for
this last processed epoch is send out first.
The new epoch then becomes the currently processed epoch.

To not get stuck in drbd_al_begin_io() waiting for P_BARRIER_ACK,
the sender thread also needs to handle the case when the current
epoch was closed already, but no new requests are queued yet,
and send out P_BARRIER as soon as possible.

This is done by comparing the per resource "current transfer log epoch"
(tconn->current_tle_nr) with the per connection "currently processed
epoch number" (tconn->send.current_epoch_nr), while waiting for
new requests to be processed in wait_for_work().

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 39ece3a..66be391 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1210,34 +1210,25 @@
 	return 0;
 }
 
-int w_send_barrier(struct drbd_work *w, int cancel)
+/* FIXME
+ * We need to track the number of pending barrier acks,
+ * and to be able to wait for them.
+ * See also comment in drbd_adm_attach before drbd_suspend_io.
+ */
+int drbd_send_barrier(struct drbd_tconn *tconn)
 {
-	struct drbd_socket *sock;
-	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
-	struct drbd_conf *mdev = w->mdev;
 	struct p_barrier *p;
+	struct drbd_socket *sock;
 
-	/* really avoid racing with tl_clear.  w.cb may have been referenced
-	 * just before it was reassigned and re-queued, so double check that.
-	 * actually, this race was harmless, since we only try to send the
-	 * barrier packet here, and otherwise do nothing with the object.
-	 * but compare with the head of w_clear_epoch */
-	spin_lock_irq(&mdev->tconn->req_lock);
-	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
-		cancel = 1;
-	spin_unlock_irq(&mdev->tconn->req_lock);
-	if (cancel)
-		return 0;
-
-	sock = &mdev->tconn->data;
-	p = drbd_prepare_command(mdev, sock);
+	sock = &tconn->data;
+	p = conn_prepare_command(tconn, sock);
 	if (!p)
 		return -EIO;
-	p->barrier = b->br_number;
-	/* inc_ap_pending was done where this was queued.
-	 * dec_ap_pending will be done in got_BarrierAck
-	 * or (on connection loss) in w_clear_epoch.  */
-	return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
+	p->barrier = tconn->send.current_epoch_nr;
+	p->pad = 0;
+	tconn->send.current_epoch_writes = 0;
+
+	return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
 }
 
 int w_send_write_hint(struct drbd_work *w, int cancel)
@@ -1257,6 +1248,7 @@
 {
 	struct drbd_request *req = container_of(w, struct drbd_request, w);
 	struct drbd_conf *mdev = w->mdev;
+	struct drbd_tconn *tconn = mdev->tconn;
 	int err;
 
 	if (unlikely(cancel)) {
@@ -1264,6 +1256,20 @@
 		return 0;
 	}
 
+	if (!tconn->send.seen_any_write_yet) {
+		tconn->send.seen_any_write_yet = true;
+		tconn->send.current_epoch_nr = req->epoch;
+	}
+	if (tconn->send.current_epoch_nr != req->epoch) {
+		if (tconn->send.current_epoch_writes)
+			drbd_send_barrier(tconn);
+		tconn->send.current_epoch_nr = req->epoch;
+	}
+	/* this time, no tconn->send.current_epoch_writes++;
+	 * If it was sent, it was the closing barrier for the last
+	 * replicated epoch, before we went into AHEAD mode.
+	 * No more barriers will be sent, until we leave AHEAD mode again. */
+
 	err = drbd_send_out_of_sync(mdev, req);
 	req_mod(req, OOS_HANDED_TO_NETWORK);
 
@@ -1280,6 +1286,7 @@
 {
 	struct drbd_request *req = container_of(w, struct drbd_request, w);
 	struct drbd_conf *mdev = w->mdev;
+	struct drbd_tconn *tconn = mdev->tconn;
 	int err;
 
 	if (unlikely(cancel)) {
@@ -1287,6 +1294,17 @@
 		return 0;
 	}
 
+	if (!tconn->send.seen_any_write_yet) {
+		tconn->send.seen_any_write_yet = true;
+		tconn->send.current_epoch_nr = req->epoch;
+	}
+	if (tconn->send.current_epoch_nr != req->epoch) {
+		if (tconn->send.current_epoch_writes)
+			drbd_send_barrier(tconn);
+		tconn->send.current_epoch_nr = req->epoch;
+	}
+	tconn->send.current_epoch_writes++;
+
 	err = drbd_send_dblock(mdev, req);
 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
 
@@ -1303,6 +1321,7 @@
 {
 	struct drbd_request *req = container_of(w, struct drbd_request, w);
 	struct drbd_conf *mdev = w->mdev;
+	struct drbd_tconn *tconn = mdev->tconn;
 	int err;
 
 	if (unlikely(cancel)) {
@@ -1310,6 +1329,15 @@
 		return 0;
 	}
 
+	/* Even read requests may close a write epoch,
+	 * if there was any yet. */
+	if (tconn->send.seen_any_write_yet &&
+	    tconn->send.current_epoch_nr != req->epoch) {
+		if (tconn->send.current_epoch_writes)
+			drbd_send_barrier(tconn);
+		tconn->send.current_epoch_nr = req->epoch;
+	}
+
 	err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
 				 (unsigned long)req);
 
@@ -1673,6 +1701,34 @@
 	mutex_unlock(mdev->state_mutex);
 }
 
+/* If the resource already closed the current epoch, but we did not
+ * (because we have not yet seen new requests), we should send the
+ * corresponding barrier now.  Must be checked within the same spinlock
+ * that is used to check for new requests. */
+bool need_to_send_barrier(struct drbd_tconn *connection)
+{
+	if (!connection->send.seen_any_write_yet)
+		return false;
+
+	/* Skip barriers that do not contain any writes.
+	 * This may happen during AHEAD mode. */
+	if (!connection->send.current_epoch_writes)
+		return false;
+
+	/* ->req_lock is held when requests are queued on
+	 * connection->sender_work, and put into ->transfer_log.
+	 * It is also held when ->current_tle_nr is increased.
+	 * So either there are already new requests queued,
+	 * and corresponding barriers will be send there.
+	 * Or nothing new is queued yet, so the difference will be 1.
+	 */
+	if (atomic_read(&connection->current_tle_nr) !=
+	    connection->send.current_epoch_nr + 1)
+		return false;
+
+	return true;
+}
+
 bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
 {
 	spin_lock_irq(&queue->q_lock);
@@ -1690,15 +1746,79 @@
 	return !list_empty(work_list);
 }
 
+void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
+{
+	DEFINE_WAIT(wait);
+	struct net_conf *nc;
+	int uncork, cork;
+
+	dequeue_work_item(&connection->sender_work, work_list);
+	if (!list_empty(work_list))
+		return;
+
+	/* Still nothing to do?
+	 * Maybe we still need to close the current epoch,
+	 * even if no new requests are queued yet.
+	 *
+	 * Also, poke TCP, just in case.
+	 * Then wait for new work (or signal). */
+	rcu_read_lock();
+	nc = rcu_dereference(connection->net_conf);
+	uncork = nc ? nc->tcp_cork : 0;
+	rcu_read_unlock();
+	if (uncork) {
+		mutex_lock(&connection->data.mutex);
+		if (connection->data.socket)
+			drbd_tcp_uncork(connection->data.socket);
+		mutex_unlock(&connection->data.mutex);
+	}
+
+	for (;;) {
+		int send_barrier;
+		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
+		spin_lock_irq(&connection->req_lock);
+		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
+		list_splice_init(&connection->sender_work.q, work_list);
+		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
+		if (!list_empty(work_list) || signal_pending(current)) {
+			spin_unlock_irq(&connection->req_lock);
+			break;
+		}
+		send_barrier = need_to_send_barrier(connection);
+		spin_unlock_irq(&connection->req_lock);
+		if (send_barrier) {
+			drbd_send_barrier(connection);
+			connection->send.current_epoch_nr++;
+		}
+		schedule();
+		/* may be woken up for other things but new work, too,
+		 * e.g. if the current epoch got closed.
+		 * In which case we send the barrier above. */
+	}
+	finish_wait(&connection->sender_work.q_wait, &wait);
+
+	/* someone may have changed the config while we have been waiting above. */
+	rcu_read_lock();
+	nc = rcu_dereference(connection->net_conf);
+	cork = nc ? nc->tcp_cork : 0;
+	rcu_read_unlock();
+	mutex_lock(&connection->data.mutex);
+	if (connection->data.socket) {
+		if (cork)
+			drbd_tcp_cork(connection->data.socket);
+		else if (!uncork)
+			drbd_tcp_uncork(connection->data.socket);
+	}
+	mutex_unlock(&connection->data.mutex);
+}
+
 int drbd_worker(struct drbd_thread *thi)
 {
 	struct drbd_tconn *tconn = thi->tconn;
 	struct drbd_work *w = NULL;
 	struct drbd_conf *mdev;
-	struct net_conf *nc;
 	LIST_HEAD(work_list);
 	int vnr;
-	int cork;
 
 	while (get_t_state(thi) == RUNNING) {
 		drbd_thread_current_set_cpu(thi);
@@ -1706,29 +1826,7 @@
 		/* as long as we use drbd_queue_work_front(),
 		 * we may only dequeue single work items here, not batches. */
 		if (list_empty(&work_list))
-			dequeue_work_item(&tconn->sender_work, &work_list);
-
-		/* Still nothing to do? Poke TCP, just in case,
-		 * then wait for new work (or signal). */
-		if (list_empty(&work_list)) {
-			mutex_lock(&tconn->data.mutex);
-			rcu_read_lock();
-			nc = rcu_dereference(tconn->net_conf);
-			cork = nc ? nc->tcp_cork : 0;
-			rcu_read_unlock();
-
-			if (tconn->data.socket && cork)
-				drbd_tcp_uncork(tconn->data.socket);
-			mutex_unlock(&tconn->data.mutex);
-
-			wait_event_interruptible(tconn->sender_work.q_wait,
-				dequeue_work_item(&tconn->sender_work, &work_list));
-
-			mutex_lock(&tconn->data.mutex);
-			if (tconn->data.socket && cork)
-				drbd_tcp_cork(tconn->data.socket);
-			mutex_unlock(&tconn->data.mutex);
-		}
+			wait_for_work(tconn, &work_list);
 
 		if (signal_pending(current)) {
 			flush_signals(current);