blob: 212e9545e634a5f7aa6828ab66f4cd752584f2eb [file] [log] [blame]
Philipp Reisnerb411b362009-09-25 16:07:19 -07001/*
2 drbd_worker.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/autoconf.h>
27#include <linux/module.h>
28#include <linux/version.h>
29#include <linux/drbd.h>
30#include <linux/sched.h>
31#include <linux/smp_lock.h>
32#include <linux/wait.h>
33#include <linux/mm.h>
34#include <linux/memcontrol.h>
35#include <linux/mm_inline.h>
36#include <linux/slab.h>
37#include <linux/random.h>
38#include <linux/mm.h>
39#include <linux/string.h>
40#include <linux/scatterlist.h>
41
42#include "drbd_int.h"
43#include "drbd_req.h"
44#include "drbd_tracing.h"
45
46#define SLEEP_TIME (HZ/10)
47
48static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
49
50
51
52/* defined here:
53 drbd_md_io_complete
54 drbd_endio_write_sec
55 drbd_endio_read_sec
56 drbd_endio_pri
57
58 * more endio handlers:
59 atodb_endio in drbd_actlog.c
60 drbd_bm_async_io_complete in drbd_bitmap.c
61
62 * For all these callbacks, note the following:
63 * The callbacks will be called in irq context by the IDE drivers,
64 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
65 * Try to get the locking right :)
66 *
67 */
68
69
70/* About the global_state_lock
71 Each state transition on an device holds a read lock. In case we have
72 to evaluate the sync after dependencies, we grab a write lock, because
73 we need stable states on all devices for that. */
74rwlock_t global_state_lock;
75
76/* used for synchronous meta data and bitmap IO
77 * submitted by drbd_md_sync_page_io()
78 */
79void drbd_md_io_complete(struct bio *bio, int error)
80{
81 struct drbd_md_io *md_io;
82
83 md_io = (struct drbd_md_io *)bio->bi_private;
84 md_io->error = error;
85
86 trace_drbd_bio(md_io->mdev, "Md", bio, 1, NULL);
87
88 complete(&md_io->event);
89}
90
91/* reads on behalf of the partner,
92 * "submitted" by the receiver
93 */
94void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
95{
96 unsigned long flags = 0;
97 struct drbd_epoch_entry *e = NULL;
98 struct drbd_conf *mdev;
99 int uptodate = bio_flagged(bio, BIO_UPTODATE);
100
101 e = bio->bi_private;
102 mdev = e->mdev;
103
104 if (error)
105 dev_warn(DEV, "read: error=%d s=%llus\n", error,
106 (unsigned long long)e->sector);
107 if (!error && !uptodate) {
108 dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
109 (unsigned long long)e->sector);
110 /* strange behavior of some lower level drivers...
111 * fail the request by clearing the uptodate flag,
112 * but do not return any error?! */
113 error = -EIO;
114 }
115
116 D_ASSERT(e->block_id != ID_VACANT);
117
118 trace_drbd_bio(mdev, "Sec", bio, 1, NULL);
119
120 spin_lock_irqsave(&mdev->req_lock, flags);
121 mdev->read_cnt += e->size >> 9;
122 list_del(&e->w.list);
123 if (list_empty(&mdev->read_ee))
124 wake_up(&mdev->ee_wait);
125 spin_unlock_irqrestore(&mdev->req_lock, flags);
126
127 drbd_chk_io_error(mdev, error, FALSE);
128 drbd_queue_work(&mdev->data.work, &e->w);
129 put_ldev(mdev);
130
131 trace_drbd_ee(mdev, e, "read completed");
132}
133
134/* writes on behalf of the partner, or resync writes,
135 * "submitted" by the receiver.
136 */
137void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
138{
139 unsigned long flags = 0;
140 struct drbd_epoch_entry *e = NULL;
141 struct drbd_conf *mdev;
142 sector_t e_sector;
143 int do_wake;
144 int is_syncer_req;
145 int do_al_complete_io;
146 int uptodate = bio_flagged(bio, BIO_UPTODATE);
147 int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
148
149 e = bio->bi_private;
150 mdev = e->mdev;
151
152 if (error)
153 dev_warn(DEV, "write: error=%d s=%llus\n", error,
154 (unsigned long long)e->sector);
155 if (!error && !uptodate) {
156 dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
157 (unsigned long long)e->sector);
158 /* strange behavior of some lower level drivers...
159 * fail the request by clearing the uptodate flag,
160 * but do not return any error?! */
161 error = -EIO;
162 }
163
164 /* error == -ENOTSUPP would be a better test,
165 * alas it is not reliable */
166 if (error && is_barrier && e->flags & EE_IS_BARRIER) {
167 drbd_bump_write_ordering(mdev, WO_bdev_flush);
168 spin_lock_irqsave(&mdev->req_lock, flags);
169 list_del(&e->w.list);
170 e->w.cb = w_e_reissue;
171 /* put_ldev actually happens below, once we come here again. */
172 __release(local);
173 spin_unlock_irqrestore(&mdev->req_lock, flags);
174 drbd_queue_work(&mdev->data.work, &e->w);
175 return;
176 }
177
178 D_ASSERT(e->block_id != ID_VACANT);
179
180 trace_drbd_bio(mdev, "Sec", bio, 1, NULL);
181
182 spin_lock_irqsave(&mdev->req_lock, flags);
183 mdev->writ_cnt += e->size >> 9;
184 is_syncer_req = is_syncer_block_id(e->block_id);
185
186 /* after we moved e to done_ee,
187 * we may no longer access it,
188 * it may be freed/reused already!
189 * (as soon as we release the req_lock) */
190 e_sector = e->sector;
191 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
192
193 list_del(&e->w.list); /* has been on active_ee or sync_ee */
194 list_add_tail(&e->w.list, &mdev->done_ee);
195
196 trace_drbd_ee(mdev, e, "write completed");
197
198 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
199 * neither did we wake possibly waiting conflicting requests.
200 * done from "drbd_process_done_ee" within the appropriate w.cb
201 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
202
203 do_wake = is_syncer_req
204 ? list_empty(&mdev->sync_ee)
205 : list_empty(&mdev->active_ee);
206
207 if (error)
208 __drbd_chk_io_error(mdev, FALSE);
209 spin_unlock_irqrestore(&mdev->req_lock, flags);
210
211 if (is_syncer_req)
212 drbd_rs_complete_io(mdev, e_sector);
213
214 if (do_wake)
215 wake_up(&mdev->ee_wait);
216
217 if (do_al_complete_io)
218 drbd_al_complete_io(mdev, e_sector);
219
220 wake_asender(mdev);
221 put_ldev(mdev);
222
223}
224
225/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
226 */
227void drbd_endio_pri(struct bio *bio, int error)
228{
229 unsigned long flags;
230 struct drbd_request *req = bio->bi_private;
231 struct drbd_conf *mdev = req->mdev;
232 struct bio_and_error m;
233 enum drbd_req_event what;
234 int uptodate = bio_flagged(bio, BIO_UPTODATE);
235
236 if (error)
237 dev_warn(DEV, "p %s: error=%d\n",
238 bio_data_dir(bio) == WRITE ? "write" : "read", error);
239 if (!error && !uptodate) {
240 dev_warn(DEV, "p %s: setting error to -EIO\n",
241 bio_data_dir(bio) == WRITE ? "write" : "read");
242 /* strange behavior of some lower level drivers...
243 * fail the request by clearing the uptodate flag,
244 * but do not return any error?! */
245 error = -EIO;
246 }
247
248 trace_drbd_bio(mdev, "Pri", bio, 1, NULL);
249
250 /* to avoid recursion in __req_mod */
251 if (unlikely(error)) {
252 what = (bio_data_dir(bio) == WRITE)
253 ? write_completed_with_error
254 : (bio_rw(bio) == READA)
255 ? read_completed_with_error
256 : read_ahead_completed_with_error;
257 } else
258 what = completed_ok;
259
260 bio_put(req->private_bio);
261 req->private_bio = ERR_PTR(error);
262
263 spin_lock_irqsave(&mdev->req_lock, flags);
264 __req_mod(req, what, &m);
265 spin_unlock_irqrestore(&mdev->req_lock, flags);
266
267 if (m.bio)
268 complete_master_bio(mdev, &m);
269}
270
271int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
272{
273 struct drbd_request *req = container_of(w, struct drbd_request, w);
274
275 /* NOTE: mdev->ldev can be NULL by the time we get here! */
276 /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
277
278 /* the only way this callback is scheduled is from _req_may_be_done,
279 * when it is done and had a local write error, see comments there */
280 drbd_req_free(req);
281
282 return TRUE;
283}
284
285int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
286{
287 struct drbd_request *req = container_of(w, struct drbd_request, w);
288
289 /* We should not detach for read io-error,
290 * but try to WRITE the P_DATA_REPLY to the failed location,
291 * to give the disk the chance to relocate that block */
292
293 spin_lock_irq(&mdev->req_lock);
294 if (cancel ||
295 mdev->state.conn < C_CONNECTED ||
296 mdev->state.pdsk <= D_INCONSISTENT) {
297 _req_mod(req, send_canceled);
298 spin_unlock_irq(&mdev->req_lock);
299 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
300 return 1;
301 }
302 spin_unlock_irq(&mdev->req_lock);
303
304 return w_send_read_req(mdev, w, 0);
305}
306
307int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308{
309 ERR_IF(cancel) return 1;
310 dev_err(DEV, "resync inactive, but callback triggered??\n");
311 return 1; /* Simply ignore this! */
312}
313
314void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
315{
316 struct hash_desc desc;
317 struct scatterlist sg;
318 struct bio_vec *bvec;
319 int i;
320
321 desc.tfm = tfm;
322 desc.flags = 0;
323
324 sg_init_table(&sg, 1);
325 crypto_hash_init(&desc);
326
327 __bio_for_each_segment(bvec, bio, i, 0) {
328 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
329 crypto_hash_update(&desc, &sg, sg.length);
330 }
331 crypto_hash_final(&desc, digest);
332}
333
334static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
335{
336 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
337 int digest_size;
338 void *digest;
339 int ok;
340
341 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
342
343 if (unlikely(cancel)) {
344 drbd_free_ee(mdev, e);
345 return 1;
346 }
347
348 if (likely(drbd_bio_uptodate(e->private_bio))) {
349 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
350 digest = kmalloc(digest_size, GFP_NOIO);
351 if (digest) {
352 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
353
354 inc_rs_pending(mdev);
355 ok = drbd_send_drequest_csum(mdev,
356 e->sector,
357 e->size,
358 digest,
359 digest_size,
360 P_CSUM_RS_REQUEST);
361 kfree(digest);
362 } else {
363 dev_err(DEV, "kmalloc() of digest failed.\n");
364 ok = 0;
365 }
366 } else
367 ok = 1;
368
369 drbd_free_ee(mdev, e);
370
371 if (unlikely(!ok))
372 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
373 return ok;
374}
375
376#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
377
378static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
379{
380 struct drbd_epoch_entry *e;
381
382 if (!get_ldev(mdev))
383 return 0;
384
385 /* GFP_TRY, because if there is no memory available right now, this may
386 * be rescheduled for later. It is "only" background resync, after all. */
387 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
388 if (!e) {
389 put_ldev(mdev);
390 return 2;
391 }
392
393 spin_lock_irq(&mdev->req_lock);
394 list_add(&e->w.list, &mdev->read_ee);
395 spin_unlock_irq(&mdev->req_lock);
396
397 e->private_bio->bi_end_io = drbd_endio_read_sec;
398 e->private_bio->bi_rw = READ;
399 e->w.cb = w_e_send_csum;
400
401 mdev->read_cnt += size >> 9;
402 drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
403
404 return 1;
405}
406
407void resync_timer_fn(unsigned long data)
408{
409 unsigned long flags;
410 struct drbd_conf *mdev = (struct drbd_conf *) data;
411 int queue;
412
413 spin_lock_irqsave(&mdev->req_lock, flags);
414
415 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
416 queue = 1;
417 if (mdev->state.conn == C_VERIFY_S)
418 mdev->resync_work.cb = w_make_ov_request;
419 else
420 mdev->resync_work.cb = w_make_resync_request;
421 } else {
422 queue = 0;
423 mdev->resync_work.cb = w_resync_inactive;
424 }
425
426 spin_unlock_irqrestore(&mdev->req_lock, flags);
427
428 /* harmless race: list_empty outside data.work.q_lock */
429 if (list_empty(&mdev->resync_work.list) && queue)
430 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
431}
432
433int w_make_resync_request(struct drbd_conf *mdev,
434 struct drbd_work *w, int cancel)
435{
436 unsigned long bit;
437 sector_t sector;
438 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
439 int max_segment_size = queue_max_segment_size(mdev->rq_queue);
440 int number, i, size, pe, mx;
441 int align, queued, sndbuf;
442
443 if (unlikely(cancel))
444 return 1;
445
446 if (unlikely(mdev->state.conn < C_CONNECTED)) {
447 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
448 return 0;
449 }
450
451 if (mdev->state.conn != C_SYNC_TARGET)
452 dev_err(DEV, "%s in w_make_resync_request\n",
453 drbd_conn_str(mdev->state.conn));
454
455 if (!get_ldev(mdev)) {
456 /* Since we only need to access mdev->rsync a
457 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
458 to continue resync with a broken disk makes no sense at
459 all */
460 dev_err(DEV, "Disk broke down during resync!\n");
461 mdev->resync_work.cb = w_resync_inactive;
462 return 1;
463 }
464
465 number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
466 pe = atomic_read(&mdev->rs_pending_cnt);
467
468 mutex_lock(&mdev->data.mutex);
469 if (mdev->data.socket)
470 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
471 else
472 mx = 1;
473 mutex_unlock(&mdev->data.mutex);
474
475 /* For resync rates >160MB/sec, allow more pending RS requests */
476 if (number > mx)
477 mx = number;
478
479 /* Limit the number of pending RS requests to no more than the peer's receive buffer */
480 if ((pe + number) > mx) {
481 number = mx - pe;
482 }
483
484 for (i = 0; i < number; i++) {
485 /* Stop generating RS requests, when half of the send buffer is filled */
486 mutex_lock(&mdev->data.mutex);
487 if (mdev->data.socket) {
488 queued = mdev->data.socket->sk->sk_wmem_queued;
489 sndbuf = mdev->data.socket->sk->sk_sndbuf;
490 } else {
491 queued = 1;
492 sndbuf = 0;
493 }
494 mutex_unlock(&mdev->data.mutex);
495 if (queued > sndbuf / 2)
496 goto requeue;
497
498next_sector:
499 size = BM_BLOCK_SIZE;
500 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
501
502 if (bit == -1UL) {
503 mdev->bm_resync_fo = drbd_bm_bits(mdev);
504 mdev->resync_work.cb = w_resync_inactive;
505 put_ldev(mdev);
506 return 1;
507 }
508
509 sector = BM_BIT_TO_SECT(bit);
510
511 if (drbd_try_rs_begin_io(mdev, sector)) {
512 mdev->bm_resync_fo = bit;
513 goto requeue;
514 }
515 mdev->bm_resync_fo = bit + 1;
516
517 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
518 drbd_rs_complete_io(mdev, sector);
519 goto next_sector;
520 }
521
522#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
523 /* try to find some adjacent bits.
524 * we stop if we have already the maximum req size.
525 *
526 * Additionally always align bigger requests, in order to
527 * be prepared for all stripe sizes of software RAIDs.
528 *
529 * we _do_ care about the agreed-upon q->max_segment_size
530 * here, as splitting up the requests on the other side is more
531 * difficult. the consequence is, that on lvm and md and other
532 * "indirect" devices, this is dead code, since
533 * q->max_segment_size will be PAGE_SIZE.
534 */
535 align = 1;
536 for (;;) {
537 if (size + BM_BLOCK_SIZE > max_segment_size)
538 break;
539
540 /* Be always aligned */
541 if (sector & ((1<<(align+3))-1))
542 break;
543
544 /* do not cross extent boundaries */
545 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
546 break;
547 /* now, is it actually dirty, after all?
548 * caution, drbd_bm_test_bit is tri-state for some
549 * obscure reason; ( b == 0 ) would get the out-of-band
550 * only accidentally right because of the "oddly sized"
551 * adjustment below */
552 if (drbd_bm_test_bit(mdev, bit+1) != 1)
553 break;
554 bit++;
555 size += BM_BLOCK_SIZE;
556 if ((BM_BLOCK_SIZE << align) <= size)
557 align++;
558 i++;
559 }
560 /* if we merged some,
561 * reset the offset to start the next drbd_bm_find_next from */
562 if (size > BM_BLOCK_SIZE)
563 mdev->bm_resync_fo = bit + 1;
564#endif
565
566 /* adjust very last sectors, in case we are oddly sized */
567 if (sector + (size>>9) > capacity)
568 size = (capacity-sector)<<9;
569 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
570 switch (read_for_csum(mdev, sector, size)) {
571 case 0: /* Disk failure*/
572 put_ldev(mdev);
573 return 0;
574 case 2: /* Allocation failed */
575 drbd_rs_complete_io(mdev, sector);
576 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
577 goto requeue;
578 /* case 1: everything ok */
579 }
580 } else {
581 inc_rs_pending(mdev);
582 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
583 sector, size, ID_SYNCER)) {
584 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
585 dec_rs_pending(mdev);
586 put_ldev(mdev);
587 return 0;
588 }
589 }
590 }
591
592 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
593 /* last syncer _request_ was sent,
594 * but the P_RS_DATA_REPLY not yet received. sync will end (and
595 * next sync group will resume), as soon as we receive the last
596 * resync data block, and the last bit is cleared.
597 * until then resync "work" is "inactive" ...
598 */
599 mdev->resync_work.cb = w_resync_inactive;
600 put_ldev(mdev);
601 return 1;
602 }
603
604 requeue:
605 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
606 put_ldev(mdev);
607 return 1;
608}
609
610static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
611{
612 int number, i, size;
613 sector_t sector;
614 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
615
616 if (unlikely(cancel))
617 return 1;
618
619 if (unlikely(mdev->state.conn < C_CONNECTED)) {
620 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
621 return 0;
622 }
623
624 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
625 if (atomic_read(&mdev->rs_pending_cnt) > number)
626 goto requeue;
627
628 number -= atomic_read(&mdev->rs_pending_cnt);
629
630 sector = mdev->ov_position;
631 for (i = 0; i < number; i++) {
632 if (sector >= capacity) {
633 mdev->resync_work.cb = w_resync_inactive;
634 return 1;
635 }
636
637 size = BM_BLOCK_SIZE;
638
639 if (drbd_try_rs_begin_io(mdev, sector)) {
640 mdev->ov_position = sector;
641 goto requeue;
642 }
643
644 if (sector + (size>>9) > capacity)
645 size = (capacity-sector)<<9;
646
647 inc_rs_pending(mdev);
648 if (!drbd_send_ov_request(mdev, sector, size)) {
649 dec_rs_pending(mdev);
650 return 0;
651 }
652 sector += BM_SECT_PER_BIT;
653 }
654 mdev->ov_position = sector;
655
656 requeue:
657 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
658 return 1;
659}
660
661
662int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
663{
664 kfree(w);
665 ov_oos_print(mdev);
666 drbd_resync_finished(mdev);
667
668 return 1;
669}
670
671static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
672{
673 kfree(w);
674
675 drbd_resync_finished(mdev);
676
677 return 1;
678}
679
680int drbd_resync_finished(struct drbd_conf *mdev)
681{
682 unsigned long db, dt, dbdt;
683 unsigned long n_oos;
684 union drbd_state os, ns;
685 struct drbd_work *w;
686 char *khelper_cmd = NULL;
687
688 /* Remove all elements from the resync LRU. Since future actions
689 * might set bits in the (main) bitmap, then the entries in the
690 * resync LRU would be wrong. */
691 if (drbd_rs_del_all(mdev)) {
692 /* In case this is not possible now, most probably because
693 * there are P_RS_DATA_REPLY Packets lingering on the worker's
694 * queue (or even the read operations for those packets
695 * is not finished by now). Retry in 100ms. */
696
697 drbd_kick_lo(mdev);
698 __set_current_state(TASK_INTERRUPTIBLE);
699 schedule_timeout(HZ / 10);
700 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
701 if (w) {
702 w->cb = w_resync_finished;
703 drbd_queue_work(&mdev->data.work, w);
704 return 1;
705 }
706 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
707 }
708
709 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
710 if (dt <= 0)
711 dt = 1;
712 db = mdev->rs_total;
713 dbdt = Bit2KB(db/dt);
714 mdev->rs_paused /= HZ;
715
716 if (!get_ldev(mdev))
717 goto out;
718
719 spin_lock_irq(&mdev->req_lock);
720 os = mdev->state;
721
722 /* This protects us against multiple calls (that can happen in the presence
723 of application IO), and against connectivity loss just before we arrive here. */
724 if (os.conn <= C_CONNECTED)
725 goto out_unlock;
726
727 ns = os;
728 ns.conn = C_CONNECTED;
729
730 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
731 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
732 "Online verify " : "Resync",
733 dt + mdev->rs_paused, mdev->rs_paused, dbdt);
734
735 n_oos = drbd_bm_total_weight(mdev);
736
737 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
738 if (n_oos) {
739 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
740 n_oos, Bit2KB(1));
741 khelper_cmd = "out-of-sync";
742 }
743 } else {
744 D_ASSERT((n_oos - mdev->rs_failed) == 0);
745
746 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
747 khelper_cmd = "after-resync-target";
748
749 if (mdev->csums_tfm && mdev->rs_total) {
750 const unsigned long s = mdev->rs_same_csum;
751 const unsigned long t = mdev->rs_total;
752 const int ratio =
753 (t == 0) ? 0 :
754 (t < 100000) ? ((s*100)/t) : (s/(t/100));
755 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
756 "transferred %luK total %luK\n",
757 ratio,
758 Bit2KB(mdev->rs_same_csum),
759 Bit2KB(mdev->rs_total - mdev->rs_same_csum),
760 Bit2KB(mdev->rs_total));
761 }
762 }
763
764 if (mdev->rs_failed) {
765 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed);
766
767 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
768 ns.disk = D_INCONSISTENT;
769 ns.pdsk = D_UP_TO_DATE;
770 } else {
771 ns.disk = D_UP_TO_DATE;
772 ns.pdsk = D_INCONSISTENT;
773 }
774 } else {
775 ns.disk = D_UP_TO_DATE;
776 ns.pdsk = D_UP_TO_DATE;
777
778 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
779 if (mdev->p_uuid) {
780 int i;
781 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
782 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
783 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
784 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
785 } else {
786 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
787 }
788 }
789
790 drbd_uuid_set_bm(mdev, 0UL);
791
792 if (mdev->p_uuid) {
793 /* Now the two UUID sets are equal, update what we
794 * know of the peer. */
795 int i;
796 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
797 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
798 }
799 }
800
801 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
802out_unlock:
803 spin_unlock_irq(&mdev->req_lock);
804 put_ldev(mdev);
805out:
806 mdev->rs_total = 0;
807 mdev->rs_failed = 0;
808 mdev->rs_paused = 0;
809 mdev->ov_start_sector = 0;
810
811 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
812 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
813 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
814 }
815
816 if (khelper_cmd)
817 drbd_khelper(mdev, khelper_cmd);
818
819 return 1;
820}
821
822/* helper */
823static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
824{
825 if (drbd_bio_has_active_page(e->private_bio)) {
826 /* This might happen if sendpage() has not finished */
827 spin_lock_irq(&mdev->req_lock);
828 list_add_tail(&e->w.list, &mdev->net_ee);
829 spin_unlock_irq(&mdev->req_lock);
830 } else
831 drbd_free_ee(mdev, e);
832}
833
834/**
835 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
836 * @mdev: DRBD device.
837 * @w: work object.
838 * @cancel: The connection will be closed anyways
839 */
840int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
841{
842 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
843 int ok;
844
845 if (unlikely(cancel)) {
846 drbd_free_ee(mdev, e);
847 dec_unacked(mdev);
848 return 1;
849 }
850
851 if (likely(drbd_bio_uptodate(e->private_bio))) {
852 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
853 } else {
854 if (__ratelimit(&drbd_ratelimit_state))
855 dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
856 (unsigned long long)e->sector);
857
858 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
859 }
860
861 dec_unacked(mdev);
862
863 move_to_net_ee_or_free(mdev, e);
864
865 if (unlikely(!ok))
866 dev_err(DEV, "drbd_send_block() failed\n");
867 return ok;
868}
869
870/**
871 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
872 * @mdev: DRBD device.
873 * @w: work object.
874 * @cancel: The connection will be closed anyways
875 */
876int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
877{
878 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
879 int ok;
880
881 if (unlikely(cancel)) {
882 drbd_free_ee(mdev, e);
883 dec_unacked(mdev);
884 return 1;
885 }
886
887 if (get_ldev_if_state(mdev, D_FAILED)) {
888 drbd_rs_complete_io(mdev, e->sector);
889 put_ldev(mdev);
890 }
891
892 if (likely(drbd_bio_uptodate(e->private_bio))) {
893 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
894 inc_rs_pending(mdev);
895 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
896 } else {
897 if (__ratelimit(&drbd_ratelimit_state))
898 dev_err(DEV, "Not sending RSDataReply, "
899 "partner DISKLESS!\n");
900 ok = 1;
901 }
902 } else {
903 if (__ratelimit(&drbd_ratelimit_state))
904 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
905 (unsigned long long)e->sector);
906
907 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
908
909 /* update resync data with failure */
910 drbd_rs_failed_io(mdev, e->sector, e->size);
911 }
912
913 dec_unacked(mdev);
914
915 move_to_net_ee_or_free(mdev, e);
916
917 if (unlikely(!ok))
918 dev_err(DEV, "drbd_send_block() failed\n");
919 return ok;
920}
921
922int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
923{
924 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
925 struct digest_info *di;
926 int digest_size;
927 void *digest = NULL;
928 int ok, eq = 0;
929
930 if (unlikely(cancel)) {
931 drbd_free_ee(mdev, e);
932 dec_unacked(mdev);
933 return 1;
934 }
935
936 drbd_rs_complete_io(mdev, e->sector);
937
938 di = (struct digest_info *)(unsigned long)e->block_id;
939
940 if (likely(drbd_bio_uptodate(e->private_bio))) {
941 /* quick hack to try to avoid a race against reconfiguration.
942 * a real fix would be much more involved,
943 * introducing more locking mechanisms */
944 if (mdev->csums_tfm) {
945 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
946 D_ASSERT(digest_size == di->digest_size);
947 digest = kmalloc(digest_size, GFP_NOIO);
948 }
949 if (digest) {
950 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
951 eq = !memcmp(digest, di->digest, digest_size);
952 kfree(digest);
953 }
954
955 if (eq) {
956 drbd_set_in_sync(mdev, e->sector, e->size);
957 mdev->rs_same_csum++;
958 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
959 } else {
960 inc_rs_pending(mdev);
961 e->block_id = ID_SYNCER;
962 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
963 }
964 } else {
965 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
966 if (__ratelimit(&drbd_ratelimit_state))
967 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
968 }
969
970 dec_unacked(mdev);
971
972 kfree(di);
973
974 move_to_net_ee_or_free(mdev, e);
975
976 if (unlikely(!ok))
977 dev_err(DEV, "drbd_send_block/ack() failed\n");
978 return ok;
979}
980
981int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
982{
983 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
984 int digest_size;
985 void *digest;
986 int ok = 1;
987
988 if (unlikely(cancel))
989 goto out;
990
991 if (unlikely(!drbd_bio_uptodate(e->private_bio)))
992 goto out;
993
994 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
995 /* FIXME if this allocation fails, online verify will not terminate! */
996 digest = kmalloc(digest_size, GFP_NOIO);
997 if (digest) {
998 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
999 inc_rs_pending(mdev);
1000 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1001 digest, digest_size, P_OV_REPLY);
1002 if (!ok)
1003 dec_rs_pending(mdev);
1004 kfree(digest);
1005 }
1006
1007out:
1008 drbd_free_ee(mdev, e);
1009
1010 dec_unacked(mdev);
1011
1012 return ok;
1013}
1014
1015void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1016{
1017 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1018 mdev->ov_last_oos_size += size>>9;
1019 } else {
1020 mdev->ov_last_oos_start = sector;
1021 mdev->ov_last_oos_size = size>>9;
1022 }
1023 drbd_set_out_of_sync(mdev, sector, size);
1024 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1025}
1026
1027int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1028{
1029 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1030 struct digest_info *di;
1031 int digest_size;
1032 void *digest;
1033 int ok, eq = 0;
1034
1035 if (unlikely(cancel)) {
1036 drbd_free_ee(mdev, e);
1037 dec_unacked(mdev);
1038 return 1;
1039 }
1040
1041 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1042 * the resync lru has been cleaned up already */
1043 drbd_rs_complete_io(mdev, e->sector);
1044
1045 di = (struct digest_info *)(unsigned long)e->block_id;
1046
1047 if (likely(drbd_bio_uptodate(e->private_bio))) {
1048 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1049 digest = kmalloc(digest_size, GFP_NOIO);
1050 if (digest) {
1051 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1052
1053 D_ASSERT(digest_size == di->digest_size);
1054 eq = !memcmp(digest, di->digest, digest_size);
1055 kfree(digest);
1056 }
1057 } else {
1058 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1059 if (__ratelimit(&drbd_ratelimit_state))
1060 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1061 }
1062
1063 dec_unacked(mdev);
1064
1065 kfree(di);
1066
1067 if (!eq)
1068 drbd_ov_oos_found(mdev, e->sector, e->size);
1069 else
1070 ov_oos_print(mdev);
1071
1072 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1073 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1074
1075 drbd_free_ee(mdev, e);
1076
1077 if (--mdev->ov_left == 0) {
1078 ov_oos_print(mdev);
1079 drbd_resync_finished(mdev);
1080 }
1081
1082 return ok;
1083}
1084
1085int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086{
1087 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1088 complete(&b->done);
1089 return 1;
1090}
1091
1092int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1093{
1094 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1095 struct p_barrier *p = &mdev->data.sbuf.barrier;
1096 int ok = 1;
1097
1098 /* really avoid racing with tl_clear. w.cb may have been referenced
1099 * just before it was reassigned and re-queued, so double check that.
1100 * actually, this race was harmless, since we only try to send the
1101 * barrier packet here, and otherwise do nothing with the object.
1102 * but compare with the head of w_clear_epoch */
1103 spin_lock_irq(&mdev->req_lock);
1104 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1105 cancel = 1;
1106 spin_unlock_irq(&mdev->req_lock);
1107 if (cancel)
1108 return 1;
1109
1110 if (!drbd_get_data_sock(mdev))
1111 return 0;
1112 p->barrier = b->br_number;
1113 /* inc_ap_pending was done where this was queued.
1114 * dec_ap_pending will be done in got_BarrierAck
1115 * or (on connection loss) in w_clear_epoch. */
1116 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1117 (struct p_header *)p, sizeof(*p), 0);
1118 drbd_put_data_sock(mdev);
1119
1120 return ok;
1121}
1122
1123int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1124{
1125 if (cancel)
1126 return 1;
1127 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1128}
1129
1130/**
1131 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1132 * @mdev: DRBD device.
1133 * @w: work object.
1134 * @cancel: The connection will be closed anyways
1135 */
1136int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1137{
1138 struct drbd_request *req = container_of(w, struct drbd_request, w);
1139 int ok;
1140
1141 if (unlikely(cancel)) {
1142 req_mod(req, send_canceled);
1143 return 1;
1144 }
1145
1146 ok = drbd_send_dblock(mdev, req);
1147 req_mod(req, ok ? handed_over_to_network : send_failed);
1148
1149 return ok;
1150}
1151
1152/**
1153 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1154 * @mdev: DRBD device.
1155 * @w: work object.
1156 * @cancel: The connection will be closed anyways
1157 */
1158int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1159{
1160 struct drbd_request *req = container_of(w, struct drbd_request, w);
1161 int ok;
1162
1163 if (unlikely(cancel)) {
1164 req_mod(req, send_canceled);
1165 return 1;
1166 }
1167
1168 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1169 (unsigned long)req);
1170
1171 if (!ok) {
1172 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1173 * so this is probably redundant */
1174 if (mdev->state.conn >= C_CONNECTED)
1175 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1176 }
1177 req_mod(req, ok ? handed_over_to_network : send_failed);
1178
1179 return ok;
1180}
1181
1182static int _drbd_may_sync_now(struct drbd_conf *mdev)
1183{
1184 struct drbd_conf *odev = mdev;
1185
1186 while (1) {
1187 if (odev->sync_conf.after == -1)
1188 return 1;
1189 odev = minor_to_mdev(odev->sync_conf.after);
1190 ERR_IF(!odev) return 1;
1191 if ((odev->state.conn >= C_SYNC_SOURCE &&
1192 odev->state.conn <= C_PAUSED_SYNC_T) ||
1193 odev->state.aftr_isp || odev->state.peer_isp ||
1194 odev->state.user_isp)
1195 return 0;
1196 }
1197}
1198
1199/**
1200 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1201 * @mdev: DRBD device.
1202 *
1203 * Called from process context only (admin command and after_state_ch).
1204 */
1205static int _drbd_pause_after(struct drbd_conf *mdev)
1206{
1207 struct drbd_conf *odev;
1208 int i, rv = 0;
1209
1210 for (i = 0; i < minor_count; i++) {
1211 odev = minor_to_mdev(i);
1212 if (!odev)
1213 continue;
1214 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1215 continue;
1216 if (!_drbd_may_sync_now(odev))
1217 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1218 != SS_NOTHING_TO_DO);
1219 }
1220
1221 return rv;
1222}
1223
1224/**
1225 * _drbd_resume_next() - Resume resync on all devices that may resync now
1226 * @mdev: DRBD device.
1227 *
1228 * Called from process context only (admin command and worker).
1229 */
1230static int _drbd_resume_next(struct drbd_conf *mdev)
1231{
1232 struct drbd_conf *odev;
1233 int i, rv = 0;
1234
1235 for (i = 0; i < minor_count; i++) {
1236 odev = minor_to_mdev(i);
1237 if (!odev)
1238 continue;
1239 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1240 continue;
1241 if (odev->state.aftr_isp) {
1242 if (_drbd_may_sync_now(odev))
1243 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1244 CS_HARD, NULL)
1245 != SS_NOTHING_TO_DO) ;
1246 }
1247 }
1248 return rv;
1249}
1250
1251void resume_next_sg(struct drbd_conf *mdev)
1252{
1253 write_lock_irq(&global_state_lock);
1254 _drbd_resume_next(mdev);
1255 write_unlock_irq(&global_state_lock);
1256}
1257
1258void suspend_other_sg(struct drbd_conf *mdev)
1259{
1260 write_lock_irq(&global_state_lock);
1261 _drbd_pause_after(mdev);
1262 write_unlock_irq(&global_state_lock);
1263}
1264
1265static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1266{
1267 struct drbd_conf *odev;
1268
1269 if (o_minor == -1)
1270 return NO_ERROR;
1271 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1272 return ERR_SYNC_AFTER;
1273
1274 /* check for loops */
1275 odev = minor_to_mdev(o_minor);
1276 while (1) {
1277 if (odev == mdev)
1278 return ERR_SYNC_AFTER_CYCLE;
1279
1280 /* dependency chain ends here, no cycles. */
1281 if (odev->sync_conf.after == -1)
1282 return NO_ERROR;
1283
1284 /* follow the dependency chain */
1285 odev = minor_to_mdev(odev->sync_conf.after);
1286 }
1287}
1288
1289int drbd_alter_sa(struct drbd_conf *mdev, int na)
1290{
1291 int changes;
1292 int retcode;
1293
1294 write_lock_irq(&global_state_lock);
1295 retcode = sync_after_error(mdev, na);
1296 if (retcode == NO_ERROR) {
1297 mdev->sync_conf.after = na;
1298 do {
1299 changes = _drbd_pause_after(mdev);
1300 changes |= _drbd_resume_next(mdev);
1301 } while (changes);
1302 }
1303 write_unlock_irq(&global_state_lock);
1304 return retcode;
1305}
1306
1307/**
1308 * drbd_start_resync() - Start the resync process
1309 * @mdev: DRBD device.
1310 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1311 *
1312 * This function might bring you directly into one of the
1313 * C_PAUSED_SYNC_* states.
1314 */
1315void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1316{
1317 union drbd_state ns;
1318 int r;
1319
1320 if (mdev->state.conn >= C_SYNC_SOURCE) {
1321 dev_err(DEV, "Resync already running!\n");
1322 return;
1323 }
1324
1325 trace_drbd_resync(mdev, TRACE_LVL_SUMMARY, "Resync starting: side=%s\n",
1326 side == C_SYNC_TARGET ? "SyncTarget" : "SyncSource");
1327
1328 /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1329 drbd_rs_cancel_all(mdev);
1330
1331 if (side == C_SYNC_TARGET) {
1332 /* Since application IO was locked out during C_WF_BITMAP_T and
1333 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1334 we check that we might make the data inconsistent. */
1335 r = drbd_khelper(mdev, "before-resync-target");
1336 r = (r >> 8) & 0xff;
1337 if (r > 0) {
1338 dev_info(DEV, "before-resync-target handler returned %d, "
1339 "dropping connection.\n", r);
1340 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1341 return;
1342 }
1343 }
1344
1345 drbd_state_lock(mdev);
1346
1347 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1348 drbd_state_unlock(mdev);
1349 return;
1350 }
1351
1352 if (side == C_SYNC_TARGET) {
1353 mdev->bm_resync_fo = 0;
1354 } else /* side == C_SYNC_SOURCE */ {
1355 u64 uuid;
1356
1357 get_random_bytes(&uuid, sizeof(u64));
1358 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1359 drbd_send_sync_uuid(mdev, uuid);
1360
1361 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1362 }
1363
1364 write_lock_irq(&global_state_lock);
1365 ns = mdev->state;
1366
1367 ns.aftr_isp = !_drbd_may_sync_now(mdev);
1368
1369 ns.conn = side;
1370
1371 if (side == C_SYNC_TARGET)
1372 ns.disk = D_INCONSISTENT;
1373 else /* side == C_SYNC_SOURCE */
1374 ns.pdsk = D_INCONSISTENT;
1375
1376 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1377 ns = mdev->state;
1378
1379 if (ns.conn < C_CONNECTED)
1380 r = SS_UNKNOWN_ERROR;
1381
1382 if (r == SS_SUCCESS) {
1383 mdev->rs_total =
1384 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1385 mdev->rs_failed = 0;
1386 mdev->rs_paused = 0;
1387 mdev->rs_start =
1388 mdev->rs_mark_time = jiffies;
1389 mdev->rs_same_csum = 0;
1390 _drbd_pause_after(mdev);
1391 }
1392 write_unlock_irq(&global_state_lock);
1393 drbd_state_unlock(mdev);
1394 put_ldev(mdev);
1395
1396 if (r == SS_SUCCESS) {
1397 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1398 drbd_conn_str(ns.conn),
1399 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1400 (unsigned long) mdev->rs_total);
1401
1402 if (mdev->rs_total == 0) {
1403 /* Peer still reachable? Beware of failing before-resync-target handlers! */
1404 request_ping(mdev);
1405 __set_current_state(TASK_INTERRUPTIBLE);
1406 schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1407 drbd_resync_finished(mdev);
1408 return;
1409 }
1410
1411 /* ns.conn may already be != mdev->state.conn,
1412 * we may have been paused in between, or become paused until
1413 * the timer triggers.
1414 * No matter, that is handled in resync_timer_fn() */
1415 if (ns.conn == C_SYNC_TARGET)
1416 mod_timer(&mdev->resync_timer, jiffies);
1417
1418 drbd_md_sync(mdev);
1419 }
1420}
1421
1422int drbd_worker(struct drbd_thread *thi)
1423{
1424 struct drbd_conf *mdev = thi->mdev;
1425 struct drbd_work *w = NULL;
1426 LIST_HEAD(work_list);
1427 int intr = 0, i;
1428
1429 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1430
1431 while (get_t_state(thi) == Running) {
1432 drbd_thread_current_set_cpu(mdev);
1433
1434 if (down_trylock(&mdev->data.work.s)) {
1435 mutex_lock(&mdev->data.mutex);
1436 if (mdev->data.socket && !mdev->net_conf->no_cork)
1437 drbd_tcp_uncork(mdev->data.socket);
1438 mutex_unlock(&mdev->data.mutex);
1439
1440 intr = down_interruptible(&mdev->data.work.s);
1441
1442 mutex_lock(&mdev->data.mutex);
1443 if (mdev->data.socket && !mdev->net_conf->no_cork)
1444 drbd_tcp_cork(mdev->data.socket);
1445 mutex_unlock(&mdev->data.mutex);
1446 }
1447
1448 if (intr) {
1449 D_ASSERT(intr == -EINTR);
1450 flush_signals(current);
1451 ERR_IF (get_t_state(thi) == Running)
1452 continue;
1453 break;
1454 }
1455
1456 if (get_t_state(thi) != Running)
1457 break;
1458 /* With this break, we have done a down() but not consumed
1459 the entry from the list. The cleanup code takes care of
1460 this... */
1461
1462 w = NULL;
1463 spin_lock_irq(&mdev->data.work.q_lock);
1464 ERR_IF(list_empty(&mdev->data.work.q)) {
1465 /* something terribly wrong in our logic.
1466 * we were able to down() the semaphore,
1467 * but the list is empty... doh.
1468 *
1469 * what is the best thing to do now?
1470 * try again from scratch, restarting the receiver,
1471 * asender, whatnot? could break even more ugly,
1472 * e.g. when we are primary, but no good local data.
1473 *
1474 * I'll try to get away just starting over this loop.
1475 */
1476 spin_unlock_irq(&mdev->data.work.q_lock);
1477 continue;
1478 }
1479 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1480 list_del_init(&w->list);
1481 spin_unlock_irq(&mdev->data.work.q_lock);
1482
1483 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1484 /* dev_warn(DEV, "worker: a callback failed! \n"); */
1485 if (mdev->state.conn >= C_CONNECTED)
1486 drbd_force_state(mdev,
1487 NS(conn, C_NETWORK_FAILURE));
1488 }
1489 }
1490 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1491 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1492
1493 spin_lock_irq(&mdev->data.work.q_lock);
1494 i = 0;
1495 while (!list_empty(&mdev->data.work.q)) {
1496 list_splice_init(&mdev->data.work.q, &work_list);
1497 spin_unlock_irq(&mdev->data.work.q_lock);
1498
1499 while (!list_empty(&work_list)) {
1500 w = list_entry(work_list.next, struct drbd_work, list);
1501 list_del_init(&w->list);
1502 w->cb(mdev, w, 1);
1503 i++; /* dead debugging code */
1504 }
1505
1506 spin_lock_irq(&mdev->data.work.q_lock);
1507 }
1508 sema_init(&mdev->data.work.s, 0);
1509 /* DANGEROUS race: if someone did queue his work within the spinlock,
1510 * but up() ed outside the spinlock, we could get an up() on the
1511 * semaphore without corresponding list entry.
1512 * So don't do that.
1513 */
1514 spin_unlock_irq(&mdev->data.work.q_lock);
1515
1516 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1517 /* _drbd_set_state only uses stop_nowait.
1518 * wait here for the Exiting receiver. */
1519 drbd_thread_stop(&mdev->receiver);
1520 drbd_mdev_cleanup(mdev);
1521
1522 dev_info(DEV, "worker terminated\n");
1523
1524 clear_bit(DEVICE_DYING, &mdev->flags);
1525 clear_bit(CONFIG_PENDING, &mdev->flags);
1526 wake_up(&mdev->state_wait);
1527
1528 return 0;
1529}