blob: 0bbecf45b485f183b41558480924fc62e9223630 [file] [log] [blame]
Philipp Reisnerb411b362009-09-25 16:07:19 -07001/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
Philipp Reisnerb411b362009-09-25 16:07:19 -070026#include <linux/module.h>
Philipp Reisnerb411b362009-09-25 16:07:19 -070027#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/smp_lock.h>
30#include <linux/wait.h>
31#include <linux/mm.h>
32#include <linux/memcontrol.h>
33#include <linux/mm_inline.h>
34#include <linux/slab.h>
35#include <linux/random.h>
Philipp Reisnerb411b362009-09-25 16:07:19 -070036#include <linux/string.h>
37#include <linux/scatterlist.h>
38
39#include "drbd_int.h"
40#include "drbd_req.h"
Philipp Reisnerb411b362009-09-25 16:07:19 -070041
42#define SLEEP_TIME (HZ/10)
43
44static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45
46
47
48/* defined here:
49 drbd_md_io_complete
50 drbd_endio_write_sec
51 drbd_endio_read_sec
52 drbd_endio_pri
53
54 * more endio handlers:
55 atodb_endio in drbd_actlog.c
56 drbd_bm_async_io_complete in drbd_bitmap.c
57
58 * For all these callbacks, note the following:
59 * The callbacks will be called in irq context by the IDE drivers,
60 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
61 * Try to get the locking right :)
62 *
63 */
64
65
66/* About the global_state_lock
67 Each state transition on an device holds a read lock. In case we have
68 to evaluate the sync after dependencies, we grab a write lock, because
69 we need stable states on all devices for that. */
70rwlock_t global_state_lock;
71
72/* used for synchronous meta data and bitmap IO
73 * submitted by drbd_md_sync_page_io()
74 */
75void drbd_md_io_complete(struct bio *bio, int error)
76{
77 struct drbd_md_io *md_io;
78
79 md_io = (struct drbd_md_io *)bio->bi_private;
80 md_io->error = error;
81
Philipp Reisnerb411b362009-09-25 16:07:19 -070082 complete(&md_io->event);
83}
84
85/* reads on behalf of the partner,
86 * "submitted" by the receiver
87 */
88void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
89{
90 unsigned long flags = 0;
91 struct drbd_epoch_entry *e = NULL;
92 struct drbd_conf *mdev;
93 int uptodate = bio_flagged(bio, BIO_UPTODATE);
94
95 e = bio->bi_private;
96 mdev = e->mdev;
97
98 if (error)
99 dev_warn(DEV, "read: error=%d s=%llus\n", error,
100 (unsigned long long)e->sector);
101 if (!error && !uptodate) {
102 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
103 (unsigned long long)e->sector);
104 /* strange behavior of some lower level drivers...
105 * fail the request by clearing the uptodate flag,
106 * but do not return any error?! */
107 error = -EIO;
108 }
109
110 D_ASSERT(e->block_id != ID_VACANT);
111
Philipp Reisnerb411b362009-09-25 16:07:19 -0700112 spin_lock_irqsave(&mdev->req_lock, flags);
113 mdev->read_cnt += e->size >> 9;
114 list_del(&e->w.list);
115 if (list_empty(&mdev->read_ee))
116 wake_up(&mdev->ee_wait);
117 spin_unlock_irqrestore(&mdev->req_lock, flags);
118
119 drbd_chk_io_error(mdev, error, FALSE);
120 drbd_queue_work(&mdev->data.work, &e->w);
121 put_ldev(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700122}
123
124/* writes on behalf of the partner, or resync writes,
125 * "submitted" by the receiver.
126 */
127void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
128{
129 unsigned long flags = 0;
130 struct drbd_epoch_entry *e = NULL;
131 struct drbd_conf *mdev;
132 sector_t e_sector;
133 int do_wake;
134 int is_syncer_req;
135 int do_al_complete_io;
136 int uptodate = bio_flagged(bio, BIO_UPTODATE);
137 int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
138
139 e = bio->bi_private;
140 mdev = e->mdev;
141
142 if (error)
143 dev_warn(DEV, "write: error=%d s=%llus\n", error,
144 (unsigned long long)e->sector);
145 if (!error && !uptodate) {
146 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
147 (unsigned long long)e->sector);
148 /* strange behavior of some lower level drivers...
149 * fail the request by clearing the uptodate flag,
150 * but do not return any error?! */
151 error = -EIO;
152 }
153
154 /* error == -ENOTSUPP would be a better test,
155 * alas it is not reliable */
156 if (error && is_barrier && e->flags & EE_IS_BARRIER) {
157 drbd_bump_write_ordering(mdev, WO_bdev_flush);
158 spin_lock_irqsave(&mdev->req_lock, flags);
159 list_del(&e->w.list);
160 e->w.cb = w_e_reissue;
161 /* put_ldev actually happens below, once we come here again. */
162 __release(local);
163 spin_unlock_irqrestore(&mdev->req_lock, flags);
164 drbd_queue_work(&mdev->data.work, &e->w);
165 return;
166 }
167
168 D_ASSERT(e->block_id != ID_VACANT);
169
Philipp Reisnerb411b362009-09-25 16:07:19 -0700170 spin_lock_irqsave(&mdev->req_lock, flags);
171 mdev->writ_cnt += e->size >> 9;
172 is_syncer_req = is_syncer_block_id(e->block_id);
173
174 /* after we moved e to done_ee,
175 * we may no longer access it,
176 * it may be freed/reused already!
177 * (as soon as we release the req_lock) */
178 e_sector = e->sector;
179 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
180
181 list_del(&e->w.list); /* has been on active_ee or sync_ee */
182 list_add_tail(&e->w.list, &mdev->done_ee);
183
Philipp Reisnerb411b362009-09-25 16:07:19 -0700184 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
185 * neither did we wake possibly waiting conflicting requests.
186 * done from "drbd_process_done_ee" within the appropriate w.cb
187 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
188
189 do_wake = is_syncer_req
190 ? list_empty(&mdev->sync_ee)
191 : list_empty(&mdev->active_ee);
192
193 if (error)
194 __drbd_chk_io_error(mdev, FALSE);
195 spin_unlock_irqrestore(&mdev->req_lock, flags);
196
197 if (is_syncer_req)
198 drbd_rs_complete_io(mdev, e_sector);
199
200 if (do_wake)
201 wake_up(&mdev->ee_wait);
202
203 if (do_al_complete_io)
204 drbd_al_complete_io(mdev, e_sector);
205
206 wake_asender(mdev);
207 put_ldev(mdev);
208
209}
210
211/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
212 */
213void drbd_endio_pri(struct bio *bio, int error)
214{
215 unsigned long flags;
216 struct drbd_request *req = bio->bi_private;
217 struct drbd_conf *mdev = req->mdev;
218 struct bio_and_error m;
219 enum drbd_req_event what;
220 int uptodate = bio_flagged(bio, BIO_UPTODATE);
221
222 if (error)
223 dev_warn(DEV, "p %s: error=%d\n",
224 bio_data_dir(bio) == WRITE ? "write" : "read", error);
225 if (!error && !uptodate) {
226 dev_warn(DEV, "p %s: setting error to -EIO\n",
227 bio_data_dir(bio) == WRITE ? "write" : "read");
228 /* strange behavior of some lower level drivers...
229 * fail the request by clearing the uptodate flag,
230 * but do not return any error?! */
231 error = -EIO;
232 }
233
Philipp Reisnerb411b362009-09-25 16:07:19 -0700234 /* to avoid recursion in __req_mod */
235 if (unlikely(error)) {
236 what = (bio_data_dir(bio) == WRITE)
237 ? write_completed_with_error
238 : (bio_rw(bio) == READA)
239 ? read_completed_with_error
240 : read_ahead_completed_with_error;
241 } else
242 what = completed_ok;
243
244 bio_put(req->private_bio);
245 req->private_bio = ERR_PTR(error);
246
247 spin_lock_irqsave(&mdev->req_lock, flags);
248 __req_mod(req, what, &m);
249 spin_unlock_irqrestore(&mdev->req_lock, flags);
250
251 if (m.bio)
252 complete_master_bio(mdev, &m);
253}
254
255int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
256{
257 struct drbd_request *req = container_of(w, struct drbd_request, w);
258
259 /* NOTE: mdev->ldev can be NULL by the time we get here! */
260 /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
261
262 /* the only way this callback is scheduled is from _req_may_be_done,
263 * when it is done and had a local write error, see comments there */
264 drbd_req_free(req);
265
266 return TRUE;
267}
268
269int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
270{
271 struct drbd_request *req = container_of(w, struct drbd_request, w);
272
273 /* We should not detach for read io-error,
274 * but try to WRITE the P_DATA_REPLY to the failed location,
275 * to give the disk the chance to relocate that block */
276
277 spin_lock_irq(&mdev->req_lock);
278 if (cancel ||
279 mdev->state.conn < C_CONNECTED ||
280 mdev->state.pdsk <= D_INCONSISTENT) {
281 _req_mod(req, send_canceled);
282 spin_unlock_irq(&mdev->req_lock);
283 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
284 return 1;
285 }
286 spin_unlock_irq(&mdev->req_lock);
287
288 return w_send_read_req(mdev, w, 0);
289}
290
291int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
292{
293 ERR_IF(cancel) return 1;
294 dev_err(DEV, "resync inactive, but callback triggered??\n");
295 return 1; /* Simply ignore this! */
296}
297
298void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
299{
300 struct hash_desc desc;
301 struct scatterlist sg;
302 struct bio_vec *bvec;
303 int i;
304
305 desc.tfm = tfm;
306 desc.flags = 0;
307
308 sg_init_table(&sg, 1);
309 crypto_hash_init(&desc);
310
311 __bio_for_each_segment(bvec, bio, i, 0) {
312 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
313 crypto_hash_update(&desc, &sg, sg.length);
314 }
315 crypto_hash_final(&desc, digest);
316}
317
318static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
319{
320 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
321 int digest_size;
322 void *digest;
323 int ok;
324
325 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
326
327 if (unlikely(cancel)) {
328 drbd_free_ee(mdev, e);
329 return 1;
330 }
331
332 if (likely(drbd_bio_uptodate(e->private_bio))) {
333 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
334 digest = kmalloc(digest_size, GFP_NOIO);
335 if (digest) {
336 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
337
338 inc_rs_pending(mdev);
339 ok = drbd_send_drequest_csum(mdev,
340 e->sector,
341 e->size,
342 digest,
343 digest_size,
344 P_CSUM_RS_REQUEST);
345 kfree(digest);
346 } else {
347 dev_err(DEV, "kmalloc() of digest failed.\n");
348 ok = 0;
349 }
350 } else
351 ok = 1;
352
353 drbd_free_ee(mdev, e);
354
355 if (unlikely(!ok))
356 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
357 return ok;
358}
359
360#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
361
362static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
363{
364 struct drbd_epoch_entry *e;
365
366 if (!get_ldev(mdev))
367 return 0;
368
369 /* GFP_TRY, because if there is no memory available right now, this may
370 * be rescheduled for later. It is "only" background resync, after all. */
371 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
372 if (!e) {
373 put_ldev(mdev);
374 return 2;
375 }
376
377 spin_lock_irq(&mdev->req_lock);
378 list_add(&e->w.list, &mdev->read_ee);
379 spin_unlock_irq(&mdev->req_lock);
380
381 e->private_bio->bi_end_io = drbd_endio_read_sec;
382 e->private_bio->bi_rw = READ;
383 e->w.cb = w_e_send_csum;
384
385 mdev->read_cnt += size >> 9;
386 drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
387
388 return 1;
389}
390
391void resync_timer_fn(unsigned long data)
392{
393 unsigned long flags;
394 struct drbd_conf *mdev = (struct drbd_conf *) data;
395 int queue;
396
397 spin_lock_irqsave(&mdev->req_lock, flags);
398
399 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
400 queue = 1;
401 if (mdev->state.conn == C_VERIFY_S)
402 mdev->resync_work.cb = w_make_ov_request;
403 else
404 mdev->resync_work.cb = w_make_resync_request;
405 } else {
406 queue = 0;
407 mdev->resync_work.cb = w_resync_inactive;
408 }
409
410 spin_unlock_irqrestore(&mdev->req_lock, flags);
411
412 /* harmless race: list_empty outside data.work.q_lock */
413 if (list_empty(&mdev->resync_work.list) && queue)
414 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
415}
416
Philipp Reisnercdd67a72010-05-04 16:57:18 +0200417static int calc_resync_rate(struct drbd_conf *mdev)
418{
419 int d = mdev->data_delay / 1000; /* us -> ms */
420 int td = mdev->sync_conf.throttle_th * 100; /* 0.1s -> ms */
421 int hd = mdev->sync_conf.hold_off_th * 100; /* 0.1s -> ms */
422 int cr = mdev->sync_conf.rate;
423
424 return d <= td ? cr :
425 d >= hd ? 0 :
426 cr + (cr * (td - d) / (hd - td));
427}
428
Philipp Reisnerb411b362009-09-25 16:07:19 -0700429int w_make_resync_request(struct drbd_conf *mdev,
430 struct drbd_work *w, int cancel)
431{
432 unsigned long bit;
433 sector_t sector;
434 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
435 int max_segment_size = queue_max_segment_size(mdev->rq_queue);
436 int number, i, size, pe, mx;
437 int align, queued, sndbuf;
438
439 if (unlikely(cancel))
440 return 1;
441
442 if (unlikely(mdev->state.conn < C_CONNECTED)) {
443 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
444 return 0;
445 }
446
447 if (mdev->state.conn != C_SYNC_TARGET)
448 dev_err(DEV, "%s in w_make_resync_request\n",
449 drbd_conn_str(mdev->state.conn));
450
451 if (!get_ldev(mdev)) {
452 /* Since we only need to access mdev->rsync a
453 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
454 to continue resync with a broken disk makes no sense at
455 all */
456 dev_err(DEV, "Disk broke down during resync!\n");
457 mdev->resync_work.cb = w_resync_inactive;
458 return 1;
459 }
460
Philipp Reisnercdd67a72010-05-04 16:57:18 +0200461 mdev->c_sync_rate = calc_resync_rate(mdev);
462 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700463 pe = atomic_read(&mdev->rs_pending_cnt);
464
465 mutex_lock(&mdev->data.mutex);
466 if (mdev->data.socket)
467 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
468 else
469 mx = 1;
470 mutex_unlock(&mdev->data.mutex);
471
472 /* For resync rates >160MB/sec, allow more pending RS requests */
473 if (number > mx)
474 mx = number;
475
476 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
477 if ((pe + number) > mx) {
478 number = mx - pe;
479 }
480
481 for (i = 0; i < number; i++) {
482 /* Stop generating RS requests, when half of the send buffer is filled */
483 mutex_lock(&mdev->data.mutex);
484 if (mdev->data.socket) {
485 queued = mdev->data.socket->sk->sk_wmem_queued;
486 sndbuf = mdev->data.socket->sk->sk_sndbuf;
487 } else {
488 queued = 1;
489 sndbuf = 0;
490 }
491 mutex_unlock(&mdev->data.mutex);
492 if (queued > sndbuf / 2)
493 goto requeue;
494
495next_sector:
496 size = BM_BLOCK_SIZE;
497 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
498
499 if (bit == -1UL) {
500 mdev->bm_resync_fo = drbd_bm_bits(mdev);
501 mdev->resync_work.cb = w_resync_inactive;
502 put_ldev(mdev);
503 return 1;
504 }
505
506 sector = BM_BIT_TO_SECT(bit);
507
508 if (drbd_try_rs_begin_io(mdev, sector)) {
509 mdev->bm_resync_fo = bit;
510 goto requeue;
511 }
512 mdev->bm_resync_fo = bit + 1;
513
514 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
515 drbd_rs_complete_io(mdev, sector);
516 goto next_sector;
517 }
518
519#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
520 /* try to find some adjacent bits.
521 * we stop if we have already the maximum req size.
522 *
523 * Additionally always align bigger requests, in order to
524 * be prepared for all stripe sizes of software RAIDs.
525 *
526 * we _do_ care about the agreed-upon q->max_segment_size
527 * here, as splitting up the requests on the other side is more
528 * difficult. the consequence is, that on lvm and md and other
529 * "indirect" devices, this is dead code, since
530 * q->max_segment_size will be PAGE_SIZE.
531 */
532 align = 1;
533 for (;;) {
534 if (size + BM_BLOCK_SIZE > max_segment_size)
535 break;
536
537 /* Be always aligned */
538 if (sector & ((1<<(align+3))-1))
539 break;
540
541 /* do not cross extent boundaries */
542 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
543 break;
544 /* now, is it actually dirty, after all?
545 * caution, drbd_bm_test_bit is tri-state for some
546 * obscure reason; ( b == 0 ) would get the out-of-band
547 * only accidentally right because of the "oddly sized"
548 * adjustment below */
549 if (drbd_bm_test_bit(mdev, bit+1) != 1)
550 break;
551 bit++;
552 size += BM_BLOCK_SIZE;
553 if ((BM_BLOCK_SIZE << align) <= size)
554 align++;
555 i++;
556 }
557 /* if we merged some,
558 * reset the offset to start the next drbd_bm_find_next from */
559 if (size > BM_BLOCK_SIZE)
560 mdev->bm_resync_fo = bit + 1;
561#endif
562
563 /* adjust very last sectors, in case we are oddly sized */
564 if (sector + (size>>9) > capacity)
565 size = (capacity-sector)<<9;
566 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
567 switch (read_for_csum(mdev, sector, size)) {
568 case 0: /* Disk failure*/
569 put_ldev(mdev);
570 return 0;
571 case 2: /* Allocation failed */
572 drbd_rs_complete_io(mdev, sector);
573 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
574 goto requeue;
575 /* case 1: everything ok */
576 }
577 } else {
578 inc_rs_pending(mdev);
579 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
580 sector, size, ID_SYNCER)) {
581 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
582 dec_rs_pending(mdev);
583 put_ldev(mdev);
584 return 0;
585 }
586 }
587 }
588
589 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
590 /* last syncer _request_ was sent,
591 * but the P_RS_DATA_REPLY not yet received. sync will end (and
592 * next sync group will resume), as soon as we receive the last
593 * resync data block, and the last bit is cleared.
594 * until then resync "work" is "inactive" ...
595 */
596 mdev->resync_work.cb = w_resync_inactive;
597 put_ldev(mdev);
598 return 1;
599 }
600
601 requeue:
602 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
603 put_ldev(mdev);
604 return 1;
605}
606
607static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
608{
609 int number, i, size;
610 sector_t sector;
611 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
612
613 if (unlikely(cancel))
614 return 1;
615
616 if (unlikely(mdev->state.conn < C_CONNECTED)) {
617 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
618 return 0;
619 }
620
621 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
622 if (atomic_read(&mdev->rs_pending_cnt) > number)
623 goto requeue;
624
625 number -= atomic_read(&mdev->rs_pending_cnt);
626
627 sector = mdev->ov_position;
628 for (i = 0; i < number; i++) {
629 if (sector >= capacity) {
630 mdev->resync_work.cb = w_resync_inactive;
631 return 1;
632 }
633
634 size = BM_BLOCK_SIZE;
635
636 if (drbd_try_rs_begin_io(mdev, sector)) {
637 mdev->ov_position = sector;
638 goto requeue;
639 }
640
641 if (sector + (size>>9) > capacity)
642 size = (capacity-sector)<<9;
643
644 inc_rs_pending(mdev);
645 if (!drbd_send_ov_request(mdev, sector, size)) {
646 dec_rs_pending(mdev);
647 return 0;
648 }
649 sector += BM_SECT_PER_BIT;
650 }
651 mdev->ov_position = sector;
652
653 requeue:
654 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
655 return 1;
656}
657
658
659int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
660{
661 kfree(w);
662 ov_oos_print(mdev);
663 drbd_resync_finished(mdev);
664
665 return 1;
666}
667
668static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
669{
670 kfree(w);
671
672 drbd_resync_finished(mdev);
673
674 return 1;
675}
676
677int drbd_resync_finished(struct drbd_conf *mdev)
678{
679 unsigned long db, dt, dbdt;
680 unsigned long n_oos;
681 union drbd_state os, ns;
682 struct drbd_work *w;
683 char *khelper_cmd = NULL;
684
685 /* Remove all elements from the resync LRU. Since future actions
686 * might set bits in the (main) bitmap, then the entries in the
687 * resync LRU would be wrong. */
688 if (drbd_rs_del_all(mdev)) {
689 /* In case this is not possible now, most probably because
690 * there are P_RS_DATA_REPLY Packets lingering on the worker's
691 * queue (or even the read operations for those packets
692 * is not finished by now). Retry in 100ms. */
693
694 drbd_kick_lo(mdev);
695 __set_current_state(TASK_INTERRUPTIBLE);
696 schedule_timeout(HZ / 10);
697 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
698 if (w) {
699 w->cb = w_resync_finished;
700 drbd_queue_work(&mdev->data.work, w);
701 return 1;
702 }
703 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
704 }
705
706 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
707 if (dt <= 0)
708 dt = 1;
709 db = mdev->rs_total;
710 dbdt = Bit2KB(db/dt);
711 mdev->rs_paused /= HZ;
712
713 if (!get_ldev(mdev))
714 goto out;
715
716 spin_lock_irq(&mdev->req_lock);
717 os = mdev->state;
718
719 /* This protects us against multiple calls (that can happen in the presence
720 of application IO), and against connectivity loss just before we arrive here. */
721 if (os.conn <= C_CONNECTED)
722 goto out_unlock;
723
724 ns = os;
725 ns.conn = C_CONNECTED;
726
727 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
728 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
729 "Online verify " : "Resync",
730 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
731
732 n_oos = drbd_bm_total_weight(mdev);
733
734 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
735 if (n_oos) {
736 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
737 n_oos, Bit2KB(1));
738 khelper_cmd = "out-of-sync";
739 }
740 } else {
741 D_ASSERT((n_oos - mdev->rs_failed) == 0);
742
743 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
744 khelper_cmd = "after-resync-target";
745
746 if (mdev->csums_tfm && mdev->rs_total) {
747 const unsigned long s = mdev->rs_same_csum;
748 const unsigned long t = mdev->rs_total;
749 const int ratio =
750 (t == 0) ? 0 :
751 (t < 100000) ? ((s*100)/t) : (s/(t/100));
752 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
753 "transferred %luK total %luK\n",
754 ratio,
755 Bit2KB(mdev->rs_same_csum),
756 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
757 Bit2KB(mdev->rs_total));
758 }
759 }
760
761 if (mdev->rs_failed) {
762 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
763
764 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
765 ns.disk = D_INCONSISTENT;
766 ns.pdsk = D_UP_TO_DATE;
767 } else {
768 ns.disk = D_UP_TO_DATE;
769 ns.pdsk = D_INCONSISTENT;
770 }
771 } else {
772 ns.disk = D_UP_TO_DATE;
773 ns.pdsk = D_UP_TO_DATE;
774
775 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
776 if (mdev->p_uuid) {
777 int i;
778 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
779 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
780 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
781 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
782 } else {
783 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
784 }
785 }
786
787 drbd_uuid_set_bm(mdev, 0UL);
788
789 if (mdev->p_uuid) {
790 /* Now the two UUID sets are equal, update what we
791 * know of the peer. */
792 int i;
793 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
794 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
795 }
796 }
797
798 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
799out_unlock:
800 spin_unlock_irq(&mdev->req_lock);
801 put_ldev(mdev);
802out:
803 mdev->rs_total = 0;
804 mdev->rs_failed = 0;
805 mdev->rs_paused = 0;
806 mdev->ov_start_sector = 0;
807
808 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
809 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
810 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
811 }
812
813 if (khelper_cmd)
814 drbd_khelper(mdev, khelper_cmd);
815
816 return 1;
817}
818
819/* helper */
820static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
821{
822 if (drbd_bio_has_active_page(e->private_bio)) {
823 /* This might happen if sendpage() has not finished */
824 spin_lock_irq(&mdev->req_lock);
825 list_add_tail(&e->w.list, &mdev->net_ee);
826 spin_unlock_irq(&mdev->req_lock);
827 } else
828 drbd_free_ee(mdev, e);
829}
830
831/**
832 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
833 * @mdev: DRBD device.
834 * @w: work object.
835 * @cancel: The connection will be closed anyways
836 */
837int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
838{
839 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
840 int ok;
841
842 if (unlikely(cancel)) {
843 drbd_free_ee(mdev, e);
844 dec_unacked(mdev);
845 return 1;
846 }
847
848 if (likely(drbd_bio_uptodate(e->private_bio))) {
849 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
850 } else {
851 if (__ratelimit(&drbd_ratelimit_state))
852 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
853 (unsigned long long)e->sector);
854
855 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
856 }
857
858 dec_unacked(mdev);
859
860 move_to_net_ee_or_free(mdev, e);
861
862 if (unlikely(!ok))
863 dev_err(DEV, "drbd_send_block() failed\n");
864 return ok;
865}
866
867/**
868 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
869 * @mdev: DRBD device.
870 * @w: work object.
871 * @cancel: The connection will be closed anyways
872 */
873int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
874{
875 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
876 int ok;
877
878 if (unlikely(cancel)) {
879 drbd_free_ee(mdev, e);
880 dec_unacked(mdev);
881 return 1;
882 }
883
884 if (get_ldev_if_state(mdev, D_FAILED)) {
885 drbd_rs_complete_io(mdev, e->sector);
886 put_ldev(mdev);
887 }
888
889 if (likely(drbd_bio_uptodate(e->private_bio))) {
890 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
891 inc_rs_pending(mdev);
892 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
893 } else {
894 if (__ratelimit(&drbd_ratelimit_state))
895 dev_err(DEV, "Not sending RSDataReply, "
896 "partner DISKLESS!\n");
897 ok = 1;
898 }
899 } else {
900 if (__ratelimit(&drbd_ratelimit_state))
901 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
902 (unsigned long long)e->sector);
903
904 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
905
906 /* update resync data with failure */
907 drbd_rs_failed_io(mdev, e->sector, e->size);
908 }
909
910 dec_unacked(mdev);
911
912 move_to_net_ee_or_free(mdev, e);
913
914 if (unlikely(!ok))
915 dev_err(DEV, "drbd_send_block() failed\n");
916 return ok;
917}
918
919int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
920{
921 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
922 struct digest_info *di;
923 int digest_size;
924 void *digest = NULL;
925 int ok, eq = 0;
926
927 if (unlikely(cancel)) {
928 drbd_free_ee(mdev, e);
929 dec_unacked(mdev);
930 return 1;
931 }
932
933 drbd_rs_complete_io(mdev, e->sector);
934
935 di = (struct digest_info *)(unsigned long)e->block_id;
936
937 if (likely(drbd_bio_uptodate(e->private_bio))) {
938 /* quick hack to try to avoid a race against reconfiguration.
939 * a real fix would be much more involved,
940 * introducing more locking mechanisms */
941 if (mdev->csums_tfm) {
942 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
943 D_ASSERT(digest_size == di->digest_size);
944 digest = kmalloc(digest_size, GFP_NOIO);
945 }
946 if (digest) {
947 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
948 eq = !memcmp(digest, di->digest, digest_size);
949 kfree(digest);
950 }
951
952 if (eq) {
953 drbd_set_in_sync(mdev, e->sector, e->size);
Lars Ellenberg676396d2010-03-03 02:08:22 +0100954 /* rs_same_csums unit is BM_BLOCK_SIZE */
955 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700956 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
957 } else {
958 inc_rs_pending(mdev);
959 e->block_id = ID_SYNCER;
960 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
961 }
962 } else {
963 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
964 if (__ratelimit(&drbd_ratelimit_state))
965 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
966 }
967
968 dec_unacked(mdev);
969
970 kfree(di);
971
972 move_to_net_ee_or_free(mdev, e);
973
974 if (unlikely(!ok))
975 dev_err(DEV, "drbd_send_block/ack() failed\n");
976 return ok;
977}
978
979int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
980{
981 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
982 int digest_size;
983 void *digest;
984 int ok = 1;
985
986 if (unlikely(cancel))
987 goto out;
988
989 if (unlikely(!drbd_bio_uptodate(e->private_bio)))
990 goto out;
991
992 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
993 /* FIXME if this allocation fails, online verify will not terminate! */
994 digest = kmalloc(digest_size, GFP_NOIO);
995 if (digest) {
996 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
997 inc_rs_pending(mdev);
998 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
999 digest, digest_size, P_OV_REPLY);
1000 if (!ok)
1001 dec_rs_pending(mdev);
1002 kfree(digest);
1003 }
1004
1005out:
1006 drbd_free_ee(mdev, e);
1007
1008 dec_unacked(mdev);
1009
1010 return ok;
1011}
1012
1013void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1014{
1015 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1016 mdev->ov_last_oos_size += size>>9;
1017 } else {
1018 mdev->ov_last_oos_start = sector;
1019 mdev->ov_last_oos_size = size>>9;
1020 }
1021 drbd_set_out_of_sync(mdev, sector, size);
1022 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1023}
1024
1025int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1026{
1027 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1028 struct digest_info *di;
1029 int digest_size;
1030 void *digest;
1031 int ok, eq = 0;
1032
1033 if (unlikely(cancel)) {
1034 drbd_free_ee(mdev, e);
1035 dec_unacked(mdev);
1036 return 1;
1037 }
1038
1039 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1040 * the resync lru has been cleaned up already */
1041 drbd_rs_complete_io(mdev, e->sector);
1042
1043 di = (struct digest_info *)(unsigned long)e->block_id;
1044
1045 if (likely(drbd_bio_uptodate(e->private_bio))) {
1046 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1047 digest = kmalloc(digest_size, GFP_NOIO);
1048 if (digest) {
1049 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1050
1051 D_ASSERT(digest_size == di->digest_size);
1052 eq = !memcmp(digest, di->digest, digest_size);
1053 kfree(digest);
1054 }
1055 } else {
1056 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1057 if (__ratelimit(&drbd_ratelimit_state))
1058 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1059 }
1060
1061 dec_unacked(mdev);
1062
1063 kfree(di);
1064
1065 if (!eq)
1066 drbd_ov_oos_found(mdev, e->sector, e->size);
1067 else
1068 ov_oos_print(mdev);
1069
1070 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1071 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1072
1073 drbd_free_ee(mdev, e);
1074
1075 if (--mdev->ov_left == 0) {
1076 ov_oos_print(mdev);
1077 drbd_resync_finished(mdev);
1078 }
1079
1080 return ok;
1081}
1082
1083int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1084{
1085 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1086 complete(&b->done);
1087 return 1;
1088}
1089
1090int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1091{
1092 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1093 struct p_barrier *p = &mdev->data.sbuf.barrier;
1094 int ok = 1;
1095
1096 /* really avoid racing with tl_clear. w.cb may have been referenced
1097 * just before it was reassigned and re-queued, so double check that.
1098 * actually, this race was harmless, since we only try to send the
1099 * barrier packet here, and otherwise do nothing with the object.
1100 * but compare with the head of w_clear_epoch */
1101 spin_lock_irq(&mdev->req_lock);
1102 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1103 cancel = 1;
1104 spin_unlock_irq(&mdev->req_lock);
1105 if (cancel)
1106 return 1;
1107
1108 if (!drbd_get_data_sock(mdev))
1109 return 0;
1110 p->barrier = b->br_number;
1111 /* inc_ap_pending was done where this was queued.
1112 * dec_ap_pending will be done in got_BarrierAck
1113 * or (on connection loss) in w_clear_epoch. */
1114 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1115 (struct p_header *)p, sizeof(*p), 0);
1116 drbd_put_data_sock(mdev);
1117
1118 return ok;
1119}
1120
1121int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122{
1123 if (cancel)
1124 return 1;
1125 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1126}
1127
1128/**
1129 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1130 * @mdev: DRBD device.
1131 * @w: work object.
1132 * @cancel: The connection will be closed anyways
1133 */
1134int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1135{
1136 struct drbd_request *req = container_of(w, struct drbd_request, w);
1137 int ok;
1138
1139 if (unlikely(cancel)) {
1140 req_mod(req, send_canceled);
1141 return 1;
1142 }
1143
1144 ok = drbd_send_dblock(mdev, req);
1145 req_mod(req, ok ? handed_over_to_network : send_failed);
1146
1147 return ok;
1148}
1149
1150/**
1151 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1152 * @mdev: DRBD device.
1153 * @w: work object.
1154 * @cancel: The connection will be closed anyways
1155 */
1156int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1157{
1158 struct drbd_request *req = container_of(w, struct drbd_request, w);
1159 int ok;
1160
1161 if (unlikely(cancel)) {
1162 req_mod(req, send_canceled);
1163 return 1;
1164 }
1165
1166 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1167 (unsigned long)req);
1168
1169 if (!ok) {
1170 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1171 * so this is probably redundant */
1172 if (mdev->state.conn >= C_CONNECTED)
1173 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1174 }
1175 req_mod(req, ok ? handed_over_to_network : send_failed);
1176
1177 return ok;
1178}
1179
1180static int _drbd_may_sync_now(struct drbd_conf *mdev)
1181{
1182 struct drbd_conf *odev = mdev;
1183
1184 while (1) {
1185 if (odev->sync_conf.after == -1)
1186 return 1;
1187 odev = minor_to_mdev(odev->sync_conf.after);
1188 ERR_IF(!odev) return 1;
1189 if ((odev->state.conn >= C_SYNC_SOURCE &&
1190 odev->state.conn <= C_PAUSED_SYNC_T) ||
1191 odev->state.aftr_isp || odev->state.peer_isp ||
1192 odev->state.user_isp)
1193 return 0;
1194 }
1195}
1196
1197/**
1198 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1199 * @mdev: DRBD device.
1200 *
1201 * Called from process context only (admin command and after_state_ch).
1202 */
1203static int _drbd_pause_after(struct drbd_conf *mdev)
1204{
1205 struct drbd_conf *odev;
1206 int i, rv = 0;
1207
1208 for (i = 0; i < minor_count; i++) {
1209 odev = minor_to_mdev(i);
1210 if (!odev)
1211 continue;
1212 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1213 continue;
1214 if (!_drbd_may_sync_now(odev))
1215 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1216 != SS_NOTHING_TO_DO);
1217 }
1218
1219 return rv;
1220}
1221
1222/**
1223 * _drbd_resume_next() - Resume resync on all devices that may resync now
1224 * @mdev: DRBD device.
1225 *
1226 * Called from process context only (admin command and worker).
1227 */
1228static int _drbd_resume_next(struct drbd_conf *mdev)
1229{
1230 struct drbd_conf *odev;
1231 int i, rv = 0;
1232
1233 for (i = 0; i < minor_count; i++) {
1234 odev = minor_to_mdev(i);
1235 if (!odev)
1236 continue;
1237 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1238 continue;
1239 if (odev->state.aftr_isp) {
1240 if (_drbd_may_sync_now(odev))
1241 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1242 CS_HARD, NULL)
1243 != SS_NOTHING_TO_DO) ;
1244 }
1245 }
1246 return rv;
1247}
1248
1249void resume_next_sg(struct drbd_conf *mdev)
1250{
1251 write_lock_irq(&global_state_lock);
1252 _drbd_resume_next(mdev);
1253 write_unlock_irq(&global_state_lock);
1254}
1255
1256void suspend_other_sg(struct drbd_conf *mdev)
1257{
1258 write_lock_irq(&global_state_lock);
1259 _drbd_pause_after(mdev);
1260 write_unlock_irq(&global_state_lock);
1261}
1262
1263static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1264{
1265 struct drbd_conf *odev;
1266
1267 if (o_minor == -1)
1268 return NO_ERROR;
1269 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1270 return ERR_SYNC_AFTER;
1271
1272 /* check for loops */
1273 odev = minor_to_mdev(o_minor);
1274 while (1) {
1275 if (odev == mdev)
1276 return ERR_SYNC_AFTER_CYCLE;
1277
1278 /* dependency chain ends here, no cycles. */
1279 if (odev->sync_conf.after == -1)
1280 return NO_ERROR;
1281
1282 /* follow the dependency chain */
1283 odev = minor_to_mdev(odev->sync_conf.after);
1284 }
1285}
1286
1287int drbd_alter_sa(struct drbd_conf *mdev, int na)
1288{
1289 int changes;
1290 int retcode;
1291
1292 write_lock_irq(&global_state_lock);
1293 retcode = sync_after_error(mdev, na);
1294 if (retcode == NO_ERROR) {
1295 mdev->sync_conf.after = na;
1296 do {
1297 changes = _drbd_pause_after(mdev);
1298 changes |= _drbd_resume_next(mdev);
1299 } while (changes);
1300 }
1301 write_unlock_irq(&global_state_lock);
1302 return retcode;
1303}
1304
Philipp Reisner309d1602010-03-02 15:03:44 +01001305static void ping_peer(struct drbd_conf *mdev)
1306{
1307 clear_bit(GOT_PING_ACK, &mdev->flags);
1308 request_ping(mdev);
1309 wait_event(mdev->misc_wait,
1310 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1311}
1312
Philipp Reisnerb411b362009-09-25 16:07:19 -07001313/**
1314 * drbd_start_resync() - Start the resync process
1315 * @mdev: DRBD device.
1316 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1317 *
1318 * This function might bring you directly into one of the
1319 * C_PAUSED_SYNC_* states.
1320 */
1321void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1322{
1323 union drbd_state ns;
1324 int r;
1325
1326 if (mdev->state.conn >= C_SYNC_SOURCE) {
1327 dev_err(DEV, "Resync already running!\n");
1328 return;
1329 }
1330
Philipp Reisnerb411b362009-09-25 16:07:19 -07001331 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1332 drbd_rs_cancel_all(mdev);
1333
1334 if (side == C_SYNC_TARGET) {
1335 /* Since application IO was locked out during C_WF_BITMAP_T and
1336 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1337 we check that we might make the data inconsistent. */
1338 r = drbd_khelper(mdev, "before-resync-target");
1339 r = (r >> 8) & 0xff;
1340 if (r > 0) {
1341 dev_info(DEV, "before-resync-target handler returned %d, "
1342 "dropping connection.\n", r);
1343 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1344 return;
1345 }
1346 }
1347
1348 drbd_state_lock(mdev);
1349
1350 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1351 drbd_state_unlock(mdev);
1352 return;
1353 }
1354
1355 if (side == C_SYNC_TARGET) {
1356 mdev->bm_resync_fo = 0;
1357 } else /* side == C_SYNC_SOURCE */ {
1358 u64 uuid;
1359
1360 get_random_bytes(&uuid, sizeof(u64));
1361 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1362 drbd_send_sync_uuid(mdev, uuid);
1363
1364 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1365 }
1366
1367 write_lock_irq(&global_state_lock);
1368 ns = mdev->state;
1369
1370 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1371
1372 ns.conn = side;
1373
1374 if (side == C_SYNC_TARGET)
1375 ns.disk = D_INCONSISTENT;
1376 else /* side == C_SYNC_SOURCE */
1377 ns.pdsk = D_INCONSISTENT;
1378
1379 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1380 ns = mdev->state;
1381
1382 if (ns.conn < C_CONNECTED)
1383 r = SS_UNKNOWN_ERROR;
1384
1385 if (r == SS_SUCCESS) {
1386 mdev->rs_total =
1387 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1388 mdev->rs_failed = 0;
1389 mdev->rs_paused = 0;
1390 mdev->rs_start =
1391 mdev->rs_mark_time = jiffies;
1392 mdev->rs_same_csum = 0;
1393 _drbd_pause_after(mdev);
1394 }
1395 write_unlock_irq(&global_state_lock);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001396 put_ldev(mdev);
1397
1398 if (r == SS_SUCCESS) {
1399 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1400 drbd_conn_str(ns.conn),
1401 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1402 (unsigned long) mdev->rs_total);
1403
1404 if (mdev->rs_total == 0) {
1405 /* Peer still reachable? Beware of failing before-resync-target handlers! */
Philipp Reisner309d1602010-03-02 15:03:44 +01001406 ping_peer(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001407 drbd_resync_finished(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001408 }
1409
1410 /* ns.conn may already be != mdev->state.conn,
1411 * we may have been paused in between, or become paused until
1412 * the timer triggers.
1413 * No matter, that is handled in resync_timer_fn() */
1414 if (ns.conn == C_SYNC_TARGET)
1415 mod_timer(&mdev->resync_timer, jiffies);
1416
1417 drbd_md_sync(mdev);
1418 }
Philipp Reisnerd0c3f602010-03-02 15:06:45 +01001419 drbd_state_unlock(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001420}
1421
1422int drbd_worker(struct drbd_thread *thi)
1423{
1424 struct drbd_conf *mdev = thi->mdev;
1425 struct drbd_work *w = NULL;
1426 LIST_HEAD(work_list);
1427 int intr = 0, i;
1428
1429 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1430
1431 while (get_t_state(thi) == Running) {
1432 drbd_thread_current_set_cpu(mdev);
1433
1434 if (down_trylock(&mdev->data.work.s)) {
1435 mutex_lock(&mdev->data.mutex);
1436 if (mdev->data.socket && !mdev->net_conf->no_cork)
1437 drbd_tcp_uncork(mdev->data.socket);
1438 mutex_unlock(&mdev->data.mutex);
1439
1440 intr = down_interruptible(&mdev->data.work.s);
1441
1442 mutex_lock(&mdev->data.mutex);
1443 if (mdev->data.socket && !mdev->net_conf->no_cork)
1444 drbd_tcp_cork(mdev->data.socket);
1445 mutex_unlock(&mdev->data.mutex);
1446 }
1447
1448 if (intr) {
1449 D_ASSERT(intr == -EINTR);
1450 flush_signals(current);
1451 ERR_IF (get_t_state(thi) == Running)
1452 continue;
1453 break;
1454 }
1455
1456 if (get_t_state(thi) != Running)
1457 break;
1458 /* With this break, we have done a down() but not consumed
1459 the entry from the list. The cleanup code takes care of
1460 this... */
1461
1462 w = NULL;
1463 spin_lock_irq(&mdev->data.work.q_lock);
1464 ERR_IF(list_empty(&mdev->data.work.q)) {
1465 /* something terribly wrong in our logic.
1466 * we were able to down() the semaphore,
1467 * but the list is empty... doh.
1468 *
1469 * what is the best thing to do now?
1470 * try again from scratch, restarting the receiver,
1471 * asender, whatnot? could break even more ugly,
1472 * e.g. when we are primary, but no good local data.
1473 *
1474 * I'll try to get away just starting over this loop.
1475 */
1476 spin_unlock_irq(&mdev->data.work.q_lock);
1477 continue;
1478 }
1479 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1480 list_del_init(&w->list);
1481 spin_unlock_irq(&mdev->data.work.q_lock);
1482
1483 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1484 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1485 if (mdev->state.conn >= C_CONNECTED)
1486 drbd_force_state(mdev,
1487 NS(conn, C_NETWORK_FAILURE));
1488 }
1489 }
1490 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1491 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1492
1493 spin_lock_irq(&mdev->data.work.q_lock);
1494 i = 0;
1495 while (!list_empty(&mdev->data.work.q)) {
1496 list_splice_init(&mdev->data.work.q, &work_list);
1497 spin_unlock_irq(&mdev->data.work.q_lock);
1498
1499 while (!list_empty(&work_list)) {
1500 w = list_entry(work_list.next, struct drbd_work, list);
1501 list_del_init(&w->list);
1502 w->cb(mdev, w, 1);
1503 i++; /* dead debugging code */
1504 }
1505
1506 spin_lock_irq(&mdev->data.work.q_lock);
1507 }
1508 sema_init(&mdev->data.work.s, 0);
1509 /* DANGEROUS race: if someone did queue his work within the spinlock,
1510 * but up() ed outside the spinlock, we could get an up() on the
1511 * semaphore without corresponding list entry.
1512 * So don't do that.
1513 */
1514 spin_unlock_irq(&mdev->data.work.q_lock);
1515
1516 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1517 /* _drbd_set_state only uses stop_nowait.
1518 * wait here for the Exiting receiver. */
1519 drbd_thread_stop(&mdev->receiver);
1520 drbd_mdev_cleanup(mdev);
1521
1522 dev_info(DEV, "worker terminated\n");
1523
1524 clear_bit(DEVICE_DYING, &mdev->flags);
1525 clear_bit(CONFIG_PENDING, &mdev->flags);
1526 wake_up(&mdev->state_wait);
1527
1528 return 0;
1529}