drbd: remove struct drbd_tl_epoch objects (barrier works)
cherry-picked and adapted from drbd 9 devel branch
DRBD requests (struct drbd_request) are already on the per resource
transfer log list, and carry their epoch number. We do not need to
additionally link them on other ring lists in other structs.
The drbd sender thread can recognize itself when to send a P_BARRIER,
by tracking the currently processed epoch, and how many writes
have been processed for that epoch.
If the epoch of the request to be processed does not match the currently
processed epoch, any writes have been processed in it, a P_BARRIER for
this last processed epoch is send out first.
The new epoch then becomes the currently processed epoch.
To not get stuck in drbd_al_begin_io() waiting for P_BARRIER_ACK,
the sender thread also needs to handle the case when the current
epoch was closed already, but no new requests are queued yet,
and send out P_BARRIER as soon as possible.
This is done by comparing the per resource "current transfer log epoch"
(tconn->current_tle_nr) with the per connection "currently processed
epoch number" (tconn->send.current_epoch_nr), while waiting for
new requests to be processed in wait_for_work().
Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index c0d0de5..309c121 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -562,12 +562,16 @@
struct bio *private_bio;
struct drbd_interval i;
- unsigned int epoch; /* barrier_nr */
- /* barrier_nr: used to check on "completion" whether this req was in
+ /* epoch: used to check on "completion" whether this req was in
* the current epoch, and we therefore have to close it,
- * starting a new epoch...
+ * causing a p_barrier packet to be send, starting a new epoch.
+ *
+ * This corresponds to "barrier" in struct p_barrier[_ack],
+ * and to "barrier_nr" in struct drbd_epoch (and various
+ * comments/function parameters/local variable names).
*/
+ unsigned int epoch;
struct list_head tl_requests; /* ring list in the transfer log */
struct bio *master_bio; /* master bio pointer */
@@ -575,14 +579,6 @@
unsigned long start_time;
};
-struct drbd_tl_epoch {
- struct drbd_work w;
- struct list_head requests; /* requests before */
- struct drbd_tl_epoch *next; /* pointer to the next barrier */
- unsigned int br_number; /* the barriers identifier. */
- int n_writes; /* number of requests attached before this barrier */
-};
-
struct drbd_epoch {
struct drbd_tconn *tconn;
struct list_head list;
@@ -845,11 +841,8 @@
unsigned int ko_count;
spinlock_t req_lock;
- struct drbd_tl_epoch *unused_spare_tle; /* for pre-allocation */
- struct drbd_tl_epoch *newest_tle;
- struct drbd_tl_epoch *oldest_tle;
- struct list_head out_of_sequence_requests;
- struct list_head barrier_acked_requests;
+
+ struct list_head transfer_log; /* all requests not yet fully processed */
struct crypto_hash *cram_hmac_tfm;
struct crypto_hash *integrity_tfm; /* checksums we compute, updates protected by tconn->data->mutex */
@@ -859,18 +852,36 @@
void *int_dig_in;
void *int_dig_vv;
+ /* receiver side */
struct drbd_epoch *current_epoch;
spinlock_t epoch_lock;
unsigned int epochs;
enum write_ordering_e write_ordering;
atomic_t current_tle_nr; /* transfer log epoch number */
+ unsigned current_tle_writes; /* writes seen within this tl epoch */
unsigned long last_reconnect_jif;
struct drbd_thread receiver;
struct drbd_thread worker;
struct drbd_thread asender;
cpumask_var_t cpu_mask;
+
+ /* sender side */
struct drbd_work_queue sender_work;
+
+ struct {
+ /* whether this sender thread
+ * has processed a single write yet. */
+ bool seen_any_write_yet;
+
+ /* Which barrier number to send with the next P_BARRIER */
+ int current_epoch_nr;
+
+ /* how many write requests have been sent
+ * with req->epoch == current_epoch_nr.
+ * If none, no P_BARRIER will be sent. */
+ unsigned current_epoch_writes;
+ } send;
};
struct drbd_conf {
@@ -1054,7 +1065,6 @@
extern void tl_release(struct drbd_tconn *, unsigned int barrier_nr,
unsigned int set_size);
extern void tl_clear(struct drbd_tconn *);
-extern void _tl_add_barrier(struct drbd_tconn *, struct drbd_tl_epoch *);
extern void drbd_free_sock(struct drbd_tconn *tconn);
extern int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
void *buf, size_t size, unsigned msg_flags);
@@ -1460,7 +1470,6 @@
extern int w_send_write_hint(struct drbd_work *, int);
extern int w_make_resync_request(struct drbd_work *, int);
extern int w_send_dblock(struct drbd_work *, int);
-extern int w_send_barrier(struct drbd_work *, int);
extern int w_send_read_req(struct drbd_work *, int);
extern int w_prev_work_done(struct drbd_work *, int);
extern int w_e_reissue(struct drbd_work *, int);
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 7e37149..8c6c48e3 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -188,147 +188,75 @@
#endif
/**
- * DOC: The transfer log
- *
- * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
- * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
- * of the list. There is always at least one &struct drbd_tl_epoch object.
- *
- * Each &struct drbd_tl_epoch has a circular double linked list of requests
- * attached.
- */
-static int tl_init(struct drbd_tconn *tconn)
-{
- struct drbd_tl_epoch *b;
-
- /* during device minor initialization, we may well use GFP_KERNEL */
- b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
- if (!b)
- return 0;
- INIT_LIST_HEAD(&b->requests);
- INIT_LIST_HEAD(&b->w.list);
- b->next = NULL;
- b->br_number = atomic_inc_return(&tconn->current_tle_nr);
- b->n_writes = 0;
- b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
-
- tconn->oldest_tle = b;
- tconn->newest_tle = b;
- INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
- INIT_LIST_HEAD(&tconn->barrier_acked_requests);
-
- return 1;
-}
-
-static void tl_cleanup(struct drbd_tconn *tconn)
-{
- if (tconn->oldest_tle != tconn->newest_tle)
- conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
- if (!list_empty(&tconn->out_of_sequence_requests))
- conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
- kfree(tconn->oldest_tle);
- tconn->oldest_tle = NULL;
- kfree(tconn->unused_spare_tle);
- tconn->unused_spare_tle = NULL;
-}
-
-/**
- * _tl_add_barrier() - Adds a barrier to the transfer log
- * @mdev: DRBD device.
- * @new: Barrier to be added before the current head of the TL.
- *
- * The caller must hold the req_lock.
- */
-void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
-{
- INIT_LIST_HEAD(&new->requests);
- INIT_LIST_HEAD(&new->w.list);
- new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
- new->next = NULL;
- new->n_writes = 0;
-
- new->br_number = atomic_inc_return(&tconn->current_tle_nr);
- if (tconn->newest_tle != new) {
- tconn->newest_tle->next = new;
- tconn->newest_tle = new;
- }
-}
-
-/**
- * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
- * @mdev: DRBD device.
+ * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
+ * @tconn: DRBD connection.
* @barrier_nr: Expected identifier of the DRBD write barrier packet.
* @set_size: Expected number of requests before that barrier.
*
* In case the passed barrier_nr or set_size does not match the oldest
- * &struct drbd_tl_epoch objects this function will cause a termination
- * of the connection.
+ * epoch of not yet barrier-acked requests, this function will cause a
+ * termination of the connection.
*/
void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
unsigned int set_size)
{
- struct drbd_conf *mdev;
- struct drbd_tl_epoch *b, *nob; /* next old barrier */
- struct list_head *le, *tle;
struct drbd_request *r;
+ struct drbd_request *req = NULL;
+ int expect_epoch = 0;
+ int expect_size = 0;
spin_lock_irq(&tconn->req_lock);
- b = tconn->oldest_tle;
+ /* find latest not yet barrier-acked write request,
+ * count writes in its epoch. */
+ list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
+ const unsigned long s = r->rq_state;
+ if (!req) {
+ if (!(s & RQ_WRITE))
+ continue;
+ if (!(s & RQ_NET_MASK))
+ continue;
+ if (s & RQ_NET_DONE)
+ continue;
+ req = r;
+ expect_epoch = req->epoch;
+ expect_size ++;
+ } else {
+ if (r->epoch != expect_epoch)
+ break;
+ if (!(s & RQ_WRITE))
+ continue;
+ /* if (s & RQ_DONE): not expected */
+ /* if (!(s & RQ_NET_MASK)): not expected */
+ expect_size++;
+ }
+ }
/* first some paranoia code */
- if (b == NULL) {
+ if (req == NULL) {
conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
barrier_nr);
goto bail;
}
- if (b->br_number != barrier_nr) {
+ if (expect_epoch != barrier_nr) {
conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
- barrier_nr, b->br_number);
+ barrier_nr, expect_epoch);
goto bail;
}
- if (b->n_writes != set_size) {
+
+ if (expect_size != set_size) {
conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
- barrier_nr, set_size, b->n_writes);
+ barrier_nr, set_size, expect_size);
goto bail;
}
/* Clean up list of requests processed during current epoch */
- list_for_each_safe(le, tle, &b->requests) {
- r = list_entry(le, struct drbd_request, tl_requests);
- _req_mod(r, BARRIER_ACKED);
+ list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
+ if (req->epoch != expect_epoch)
+ break;
+ _req_mod(req, BARRIER_ACKED);
}
- /* There could be requests on the list waiting for completion
- of the write to the local disk. To avoid corruptions of
- slab's data structures we have to remove the lists head.
-
- Also there could have been a barrier ack out of sequence, overtaking
- the write acks - which would be a bug and violating write ordering.
- To not deadlock in case we lose connection while such requests are
- still pending, we need some way to find them for the
- _req_mode(CONNECTION_LOST_WHILE_PENDING).
-
- These have been list_move'd to the out_of_sequence_requests list in
- _req_mod(, BARRIER_ACKED) above.
- */
- list_splice_init(&b->requests, &tconn->barrier_acked_requests);
- mdev = b->w.mdev;
-
- nob = b->next;
- if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
- _tl_add_barrier(tconn, b);
- if (nob)
- tconn->oldest_tle = nob;
- /* if nob == NULL b was the only barrier, and becomes the new
- barrier. Therefore tconn->oldest_tle points already to b */
- } else {
- D_ASSERT(nob != NULL);
- tconn->oldest_tle = nob;
- kfree(b);
- }
-
spin_unlock_irq(&tconn->req_lock);
- dec_ap_pending(mdev);
return;
@@ -346,91 +274,20 @@
* @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
* RESTART_FROZEN_DISK_IO.
*/
+/* must hold resource->req_lock */
void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
{
- struct drbd_tl_epoch *b, *tmp, **pn;
- struct list_head *le, *tle, carry_reads;
- struct drbd_request *req;
- int rv, n_writes, n_reads;
+ struct drbd_request *req, *r;
- b = tconn->oldest_tle;
- pn = &tconn->oldest_tle;
- while (b) {
- n_writes = 0;
- n_reads = 0;
- INIT_LIST_HEAD(&carry_reads);
- list_for_each_safe(le, tle, &b->requests) {
- req = list_entry(le, struct drbd_request, tl_requests);
- rv = _req_mod(req, what);
+ list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests)
+ _req_mod(req, what);
+}
- if (rv & MR_WRITE)
- n_writes++;
- if (rv & MR_READ)
- n_reads++;
- }
- tmp = b->next;
-
- if (n_writes) {
- if (what == RESEND) {
- b->n_writes = n_writes;
- if (b->w.cb == NULL) {
- b->w.cb = w_send_barrier;
- inc_ap_pending(b->w.mdev);
- set_bit(CREATE_BARRIER, &tconn->flags);
- }
-
- drbd_queue_work(&tconn->sender_work, &b->w);
- }
- pn = &b->next;
- } else {
- if (n_reads)
- list_add(&carry_reads, &b->requests);
- /* there could still be requests on that ring list,
- * in case local io is still pending */
- list_del(&b->requests);
-
- /* dec_ap_pending corresponding to queue_barrier.
- * the newest barrier may not have been queued yet,
- * in which case w.cb is still NULL. */
- if (b->w.cb != NULL)
- dec_ap_pending(b->w.mdev);
-
- if (b == tconn->newest_tle) {
- /* recycle, but reinit! */
- if (tmp != NULL)
- conn_err(tconn, "ASSERT FAILED tmp == NULL");
- INIT_LIST_HEAD(&b->requests);
- list_splice(&carry_reads, &b->requests);
- INIT_LIST_HEAD(&b->w.list);
- b->w.cb = NULL;
- b->br_number = atomic_inc_return(&tconn->current_tle_nr);
- b->n_writes = 0;
-
- *pn = b;
- break;
- }
- *pn = tmp;
- kfree(b);
- }
- b = tmp;
- list_splice(&carry_reads, &b->requests);
- }
-
- /* Actions operating on the disk state, also want to work on
- requests that got barrier acked. */
- switch (what) {
- case FAIL_FROZEN_DISK_IO:
- case RESTART_FROZEN_DISK_IO:
- list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
- req = list_entry(le, struct drbd_request, tl_requests);
- _req_mod(req, what);
- }
- case CONNECTION_LOST_WHILE_PENDING:
- case RESEND:
- break;
- default:
- conn_err(tconn, "what = %d in _tl_restart()\n", what);
- }
+void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
+{
+ spin_lock_irq(&tconn->req_lock);
+ _tl_restart(tconn, what);
+ spin_unlock_irq(&tconn->req_lock);
}
/**
@@ -443,36 +300,7 @@
*/
void tl_clear(struct drbd_tconn *tconn)
{
- struct list_head *le, *tle;
- struct drbd_request *r;
-
- spin_lock_irq(&tconn->req_lock);
-
- _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
-
- /* we expect this list to be empty. */
- if (!list_empty(&tconn->out_of_sequence_requests))
- conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
-
- /* but just in case, clean it up anyways! */
- list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
- r = list_entry(le, struct drbd_request, tl_requests);
- /* It would be nice to complete outside of spinlock.
- * But this is easier for now. */
- _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
- }
-
- /* ensure bit indicating barrier is required is clear */
- clear_bit(CREATE_BARRIER, &tconn->flags);
-
- spin_unlock_irq(&tconn->req_lock);
-}
-
-void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
-{
- spin_lock_irq(&tconn->req_lock);
- _tl_restart(tconn, what);
- spin_unlock_irq(&tconn->req_lock);
+ tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
}
/**
@@ -482,31 +310,16 @@
void tl_abort_disk_io(struct drbd_conf *mdev)
{
struct drbd_tconn *tconn = mdev->tconn;
- struct drbd_tl_epoch *b;
- struct list_head *le, *tle;
- struct drbd_request *req;
+ struct drbd_request *req, *r;
spin_lock_irq(&tconn->req_lock);
- b = tconn->oldest_tle;
- while (b) {
- list_for_each_safe(le, tle, &b->requests) {
- req = list_entry(le, struct drbd_request, tl_requests);
- if (!(req->rq_state & RQ_LOCAL_PENDING))
- continue;
- if (req->w.mdev == mdev)
- _req_mod(req, ABORT_DISK_IO);
- }
- b = b->next;
- }
-
- list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
- req = list_entry(le, struct drbd_request, tl_requests);
+ list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) {
if (!(req->rq_state & RQ_LOCAL_PENDING))
continue;
- if (req->w.mdev == mdev)
- _req_mod(req, ABORT_DISK_IO);
+ if (req->w.mdev != mdev)
+ continue;
+ _req_mod(req, ABORT_DISK_IO);
}
-
spin_unlock_irq(&tconn->req_lock);
}
@@ -2680,17 +2493,21 @@
if (set_resource_options(tconn, res_opts))
goto fail;
- if (!tl_init(tconn))
- goto fail;
-
tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
if (!tconn->current_epoch)
goto fail;
+
+ INIT_LIST_HEAD(&tconn->transfer_log);
+
INIT_LIST_HEAD(&tconn->current_epoch->list);
tconn->epochs = 1;
spin_lock_init(&tconn->epoch_lock);
tconn->write_ordering = WO_bdev_flush;
+ tconn->send.seen_any_write_yet = false;
+ tconn->send.current_epoch_nr = 0;
+ tconn->send.current_epoch_writes = 0;
+
tconn->cstate = C_STANDALONE;
mutex_init(&tconn->cstate_mutex);
spin_lock_init(&tconn->req_lock);
@@ -2713,7 +2530,6 @@
fail:
kfree(tconn->current_epoch);
- tl_cleanup(tconn);
free_cpumask_var(tconn->cpu_mask);
drbd_free_socket(&tconn->meta);
drbd_free_socket(&tconn->data);
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index c5d4fac..bbc5c2f 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -622,6 +622,8 @@
/* Wait until nothing is on the fly :) */
wait_event(mdev->misc_wait, atomic_read(&mdev->ap_pending_cnt) == 0);
+ /* FIXME also wait for all pending P_BARRIER_ACK? */
+
if (new_role == R_SECONDARY) {
set_disk_ro(mdev->vdisk, true);
if (get_ldev(mdev)) {
@@ -1436,6 +1438,12 @@
drbd_suspend_io(mdev);
/* also wait for the last barrier ack. */
+ /* FIXME see also https://daiquiri.linbit/cgi-bin/bugzilla/show_bug.cgi?id=171
+ * We need a way to either ignore barrier acks for barriers sent before a device
+ * was attached, or a way to wait for all pending barrier acks to come in.
+ * As barriers are counted per resource,
+ * we'd need to suspend io on all devices of a resource.
+ */
wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_pending_cnt) || drbd_suspended(mdev));
/* and for any other previously queued work */
drbd_flush_workqueue(mdev);
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index 34fc33b..7fe6b01 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -4451,6 +4451,7 @@
conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
atomic_set(&tconn->current_epoch->epoch_size, 0);
+ tconn->send.seen_any_write_yet = false;
conn_info(tconn, "Connection closed\n");
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index e609557..ca28b56 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -149,46 +149,16 @@
drbd_req_free(req);
}
-static void queue_barrier(struct drbd_conf *mdev)
-{
- struct drbd_tl_epoch *b;
- struct drbd_tconn *tconn = mdev->tconn;
-
- /* We are within the req_lock. Once we queued the barrier for sending,
- * we set the CREATE_BARRIER bit. It is cleared as soon as a new
- * barrier/epoch object is added. This is the only place this bit is
- * set. It indicates that the barrier for this epoch is already queued,
- * and no new epoch has been created yet. */
- if (test_bit(CREATE_BARRIER, &tconn->flags))
- return;
-
- b = tconn->newest_tle;
- b->w.cb = w_send_barrier;
- b->w.mdev = mdev;
- /* inc_ap_pending done here, so we won't
- * get imbalanced on connection loss.
- * dec_ap_pending will be done in got_BarrierAck
- * or (on connection loss) in tl_clear. */
- inc_ap_pending(mdev);
- drbd_queue_work(&tconn->sender_work, &b->w);
- set_bit(CREATE_BARRIER, &tconn->flags);
+static void wake_all_senders(struct drbd_tconn *tconn) {
+ wake_up(&tconn->sender_work.q_wait);
}
-static void _about_to_complete_local_write(struct drbd_conf *mdev,
- struct drbd_request *req)
+/* must hold resource->req_lock */
+static void start_new_tl_epoch(struct drbd_tconn *tconn)
{
- const unsigned long s = req->rq_state;
-
- /* Before we can signal completion to the upper layers,
- * we may need to close the current epoch.
- * We can skip this, if this request has not even been sent, because we
- * did not have a fully established connection yet/anymore, during
- * bitmap exchange, or while we are C_AHEAD due to congestion policy.
- */
- if (mdev->state.conn >= C_CONNECTED &&
- (s & RQ_NET_SENT) != 0 &&
- req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
- queue_barrier(mdev);
+ tconn->current_tle_writes = 0;
+ atomic_inc(&tconn->current_tle_nr);
+ wake_all_senders(tconn);
}
void complete_master_bio(struct drbd_conf *mdev,
@@ -320,9 +290,16 @@
} else if (!(s & RQ_POSTPONED))
D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
- /* for writes we need to do some extra housekeeping */
- if (rw == WRITE)
- _about_to_complete_local_write(mdev, req);
+ /* Before we can signal completion to the upper layers,
+ * we may need to close the current transfer log epoch.
+ * We are within the request lock, so we can simply compare
+ * the request epoch number with the current transfer log
+ * epoch number. If they match, increase the current_tle_nr,
+ * and reset the transfer log epoch write_cnt.
+ */
+ if (rw == WRITE &&
+ req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
+ start_new_tl_epoch(mdev->tconn);
/* Update disk stats */
_drbd_end_io_acct(mdev, req);
@@ -514,15 +491,6 @@
* hurting performance. */
set_bit(UNPLUG_REMOTE, &mdev->flags);
- /* see __drbd_make_request,
- * just after it grabs the req_lock */
- D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0);
-
- req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
-
- /* increment size of current epoch */
- mdev->tconn->newest_tle->n_writes++;
-
/* queue work item to send data */
D_ASSERT(req->rq_state & RQ_NET_PENDING);
req->rq_state |= RQ_NET_QUEUED;
@@ -534,8 +502,8 @@
nc = rcu_dereference(mdev->tconn->net_conf);
p = nc->max_epoch_size;
rcu_read_unlock();
- if (mdev->tconn->newest_tle->n_writes >= p)
- queue_barrier(mdev);
+ if (mdev->tconn->current_tle_writes >= p)
+ start_new_tl_epoch(mdev->tconn);
break;
@@ -692,6 +660,7 @@
During connection handshake, we ensure that the peer was not rebooted. */
if (!(req->rq_state & RQ_NET_OK)) {
if (req->w.cb) {
+ /* w.cb expected to be w_send_dblock, or w_send_read_req */
drbd_queue_work(&mdev->tconn->sender_work, &req->w);
rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
}
@@ -708,7 +677,6 @@
* this is bad, because if the connection is lost now,
* we won't be able to clean them up... */
dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
- list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests);
}
if ((req->rq_state & RQ_NET_MASK) != 0) {
req->rq_state |= RQ_NET_DONE;
@@ -835,7 +803,6 @@
const int rw = bio_rw(bio);
const int size = bio->bi_size;
const sector_t sector = bio->bi_sector;
- struct drbd_tl_epoch *b = NULL;
struct drbd_request *req;
struct net_conf *nc;
int local, remote, send_oos = 0;
@@ -916,24 +883,6 @@
goto fail_free_complete;
}
- /* For WRITE request, we have to make sure that we have an
- * unused_spare_tle, in case we need to start a new epoch.
- * I try to be smart and avoid to pre-allocate always "just in case",
- * but there is a race between testing the bit and pointer outside the
- * spinlock, and grabbing the spinlock.
- * if we lost that race, we retry. */
- if (rw == WRITE && (remote || send_oos) &&
- mdev->tconn->unused_spare_tle == NULL &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
-allocate_barrier:
- b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
- if (!b) {
- dev_err(DEV, "Failed to alloc barrier.\n");
- err = -ENOMEM;
- goto fail_free_complete;
- }
- }
-
/* GOOD, everything prepared, grab the spin_lock */
spin_lock_irq(&mdev->tconn->req_lock);
@@ -969,42 +918,9 @@
}
}
- if (b && mdev->tconn->unused_spare_tle == NULL) {
- mdev->tconn->unused_spare_tle = b;
- b = NULL;
- }
- if (rw == WRITE && (remote || send_oos) &&
- mdev->tconn->unused_spare_tle == NULL &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
- /* someone closed the current epoch
- * while we were grabbing the spinlock */
- spin_unlock_irq(&mdev->tconn->req_lock);
- goto allocate_barrier;
- }
-
-
/* Update disk stats */
_drbd_start_io_acct(mdev, req, bio);
- /* _maybe_start_new_epoch(mdev);
- * If we need to generate a write barrier packet, we have to add the
- * new epoch (barrier) object, and queue the barrier packet for sending,
- * and queue the req's data after it _within the same lock_, otherwise
- * we have race conditions were the reorder domains could be mixed up.
- *
- * Even read requests may start a new epoch and queue the corresponding
- * barrier packet. To get the write ordering right, we only have to
- * make sure that, if this is a write request and it triggered a
- * barrier packet, this request is queued within the same spinlock. */
- if ((remote || send_oos) && mdev->tconn->unused_spare_tle &&
- test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) {
- _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle);
- mdev->tconn->unused_spare_tle = NULL;
- } else {
- D_ASSERT(!(remote && rw == WRITE &&
- test_bit(CREATE_BARRIER, &mdev->tconn->flags)));
- }
-
/* NOTE
* Actually, 'local' may be wrong here already, since we may have failed
* to write to the meta data, and may become wrong anytime because of
@@ -1025,7 +941,12 @@
if (local)
_req_mod(req, TO_BE_SUBMITTED);
- list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests);
+ /* which transfer log epoch does this belong to? */
+ req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
+ if (rw == WRITE)
+ mdev->tconn->current_tle_writes++;
+
+ list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
/* NOTE remote first: to get the concurrent write detection right,
* we must register the request before start of local IO. */
@@ -1059,7 +980,9 @@
}
if (congested) {
- queue_barrier(mdev); /* last barrier, after mirrored writes */
+ if (mdev->tconn->current_tle_writes)
+ /* start a new epoch for non-mirrored writes */
+ start_new_tl_epoch(mdev->tconn);
if (nc->on_congestion == OC_PULL_AHEAD)
_drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
@@ -1070,7 +993,6 @@
rcu_read_unlock();
spin_unlock_irq(&mdev->tconn->req_lock);
- kfree(b); /* if someone else has beaten us to it... */
if (local) {
req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
@@ -1108,7 +1030,6 @@
drbd_req_free(req);
dec_ap_bio(mdev);
- kfree(b);
return ret;
}
@@ -1164,12 +1085,23 @@
return limit;
}
+struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
+{
+ /* Walk the transfer log,
+ * and find the oldest not yet completed request */
+ struct drbd_request *r;
+ list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
+ if (r->rq_state & (RQ_NET_PENDING|RQ_LOCAL_PENDING))
+ return r;
+ }
+ return NULL;
+}
+
void request_timer_fn(unsigned long data)
{
struct drbd_conf *mdev = (struct drbd_conf *) data;
struct drbd_tconn *tconn = mdev->tconn;
struct drbd_request *req; /* oldest request */
- struct list_head *le;
struct net_conf *nc;
unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
unsigned long now;
@@ -1193,16 +1125,13 @@
now = jiffies;
spin_lock_irq(&tconn->req_lock);
- le = &tconn->oldest_tle->requests;
- if (list_empty(le)) {
+ req = find_oldest_request(tconn);
+ if (!req) {
spin_unlock_irq(&tconn->req_lock);
mod_timer(&mdev->request_timer, now + et);
return;
}
- le = le->prev;
- req = list_entry(le, struct drbd_request, tl_requests);
-
/* The request is considered timed out, if
* - we have some effective timeout from the configuration,
* with above state restrictions applied,
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 39ece3a..66be391 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1210,34 +1210,25 @@
return 0;
}
-int w_send_barrier(struct drbd_work *w, int cancel)
+/* FIXME
+ * We need to track the number of pending barrier acks,
+ * and to be able to wait for them.
+ * See also comment in drbd_adm_attach before drbd_suspend_io.
+ */
+int drbd_send_barrier(struct drbd_tconn *tconn)
{
- struct drbd_socket *sock;
- struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
- struct drbd_conf *mdev = w->mdev;
struct p_barrier *p;
+ struct drbd_socket *sock;
- /* really avoid racing with tl_clear. w.cb may have been referenced
- * just before it was reassigned and re-queued, so double check that.
- * actually, this race was harmless, since we only try to send the
- * barrier packet here, and otherwise do nothing with the object.
- * but compare with the head of w_clear_epoch */
- spin_lock_irq(&mdev->tconn->req_lock);
- if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
- cancel = 1;
- spin_unlock_irq(&mdev->tconn->req_lock);
- if (cancel)
- return 0;
-
- sock = &mdev->tconn->data;
- p = drbd_prepare_command(mdev, sock);
+ sock = &tconn->data;
+ p = conn_prepare_command(tconn, sock);
if (!p)
return -EIO;
- p->barrier = b->br_number;
- /* inc_ap_pending was done where this was queued.
- * dec_ap_pending will be done in got_BarrierAck
- * or (on connection loss) in w_clear_epoch. */
- return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
+ p->barrier = tconn->send.current_epoch_nr;
+ p->pad = 0;
+ tconn->send.current_epoch_writes = 0;
+
+ return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
}
int w_send_write_hint(struct drbd_work *w, int cancel)
@@ -1257,6 +1248,7 @@
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1264,6 +1256,20 @@
return 0;
}
+ if (!tconn->send.seen_any_write_yet) {
+ tconn->send.seen_any_write_yet = true;
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ if (tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ /* this time, no tconn->send.current_epoch_writes++;
+ * If it was sent, it was the closing barrier for the last
+ * replicated epoch, before we went into AHEAD mode.
+ * No more barriers will be sent, until we leave AHEAD mode again. */
+
err = drbd_send_out_of_sync(mdev, req);
req_mod(req, OOS_HANDED_TO_NETWORK);
@@ -1280,6 +1286,7 @@
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1287,6 +1294,17 @@
return 0;
}
+ if (!tconn->send.seen_any_write_yet) {
+ tconn->send.seen_any_write_yet = true;
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ if (tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ tconn->send.current_epoch_writes++;
+
err = drbd_send_dblock(mdev, req);
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
@@ -1303,6 +1321,7 @@
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1310,6 +1329,15 @@
return 0;
}
+ /* Even read requests may close a write epoch,
+ * if there was any yet. */
+ if (tconn->send.seen_any_write_yet &&
+ tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+
err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
(unsigned long)req);
@@ -1673,6 +1701,34 @@
mutex_unlock(mdev->state_mutex);
}
+/* If the resource already closed the current epoch, but we did not
+ * (because we have not yet seen new requests), we should send the
+ * corresponding barrier now. Must be checked within the same spinlock
+ * that is used to check for new requests. */
+bool need_to_send_barrier(struct drbd_tconn *connection)
+{
+ if (!connection->send.seen_any_write_yet)
+ return false;
+
+ /* Skip barriers that do not contain any writes.
+ * This may happen during AHEAD mode. */
+ if (!connection->send.current_epoch_writes)
+ return false;
+
+ /* ->req_lock is held when requests are queued on
+ * connection->sender_work, and put into ->transfer_log.
+ * It is also held when ->current_tle_nr is increased.
+ * So either there are already new requests queued,
+ * and corresponding barriers will be send there.
+ * Or nothing new is queued yet, so the difference will be 1.
+ */
+ if (atomic_read(&connection->current_tle_nr) !=
+ connection->send.current_epoch_nr + 1)
+ return false;
+
+ return true;
+}
+
bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
{
spin_lock_irq(&queue->q_lock);
@@ -1690,15 +1746,79 @@
return !list_empty(work_list);
}
+void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
+{
+ DEFINE_WAIT(wait);
+ struct net_conf *nc;
+ int uncork, cork;
+
+ dequeue_work_item(&connection->sender_work, work_list);
+ if (!list_empty(work_list))
+ return;
+
+ /* Still nothing to do?
+ * Maybe we still need to close the current epoch,
+ * even if no new requests are queued yet.
+ *
+ * Also, poke TCP, just in case.
+ * Then wait for new work (or signal). */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ uncork = nc ? nc->tcp_cork : 0;
+ rcu_read_unlock();
+ if (uncork) {
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket)
+ drbd_tcp_uncork(connection->data.socket);
+ mutex_unlock(&connection->data.mutex);
+ }
+
+ for (;;) {
+ int send_barrier;
+ prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
+ spin_lock_irq(&connection->req_lock);
+ spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
+ list_splice_init(&connection->sender_work.q, work_list);
+ spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
+ if (!list_empty(work_list) || signal_pending(current)) {
+ spin_unlock_irq(&connection->req_lock);
+ break;
+ }
+ send_barrier = need_to_send_barrier(connection);
+ spin_unlock_irq(&connection->req_lock);
+ if (send_barrier) {
+ drbd_send_barrier(connection);
+ connection->send.current_epoch_nr++;
+ }
+ schedule();
+ /* may be woken up for other things but new work, too,
+ * e.g. if the current epoch got closed.
+ * In which case we send the barrier above. */
+ }
+ finish_wait(&connection->sender_work.q_wait, &wait);
+
+ /* someone may have changed the config while we have been waiting above. */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ cork = nc ? nc->tcp_cork : 0;
+ rcu_read_unlock();
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket) {
+ if (cork)
+ drbd_tcp_cork(connection->data.socket);
+ else if (!uncork)
+ drbd_tcp_uncork(connection->data.socket);
+ }
+ mutex_unlock(&connection->data.mutex);
+}
+
int drbd_worker(struct drbd_thread *thi)
{
struct drbd_tconn *tconn = thi->tconn;
struct drbd_work *w = NULL;
struct drbd_conf *mdev;
- struct net_conf *nc;
LIST_HEAD(work_list);
int vnr;
- int cork;
while (get_t_state(thi) == RUNNING) {
drbd_thread_current_set_cpu(thi);
@@ -1706,29 +1826,7 @@
/* as long as we use drbd_queue_work_front(),
* we may only dequeue single work items here, not batches. */
if (list_empty(&work_list))
- dequeue_work_item(&tconn->sender_work, &work_list);
-
- /* Still nothing to do? Poke TCP, just in case,
- * then wait for new work (or signal). */
- if (list_empty(&work_list)) {
- mutex_lock(&tconn->data.mutex);
- rcu_read_lock();
- nc = rcu_dereference(tconn->net_conf);
- cork = nc ? nc->tcp_cork : 0;
- rcu_read_unlock();
-
- if (tconn->data.socket && cork)
- drbd_tcp_uncork(tconn->data.socket);
- mutex_unlock(&tconn->data.mutex);
-
- wait_event_interruptible(tconn->sender_work.q_wait,
- dequeue_work_item(&tconn->sender_work, &work_list));
-
- mutex_lock(&tconn->data.mutex);
- if (tconn->data.socket && cork)
- drbd_tcp_cork(tconn->data.socket);
- mutex_unlock(&tconn->data.mutex);
- }
+ wait_for_work(tconn, &work_list);
if (signal_pending(current)) {
flush_signals(current);