blob: c7285e16b667a2d5b23a7dc0c98f1e0458d8b563 [file] [log] [blame]
Philipp Reisnerb411b362009-09-25 16:07:19 -07001/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
Philipp Reisnerb411b362009-09-25 16:07:19 -070026#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
Philipp Reisnerb411b362009-09-25 16:07:19 -070031#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
45#include <linux/mm.h>
46#include <linux/string.h>
47#include <linux/scatterlist.h>
48#include "drbd_int.h"
Philipp Reisnerb411b362009-09-25 16:07:19 -070049#include "drbd_req.h"
50
51#include "drbd_vli.h"
52
53struct flush_work {
54 struct drbd_work w;
55 struct drbd_epoch *epoch;
56};
57
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
64static int drbd_do_handshake(struct drbd_conf *mdev);
65static int drbd_do_auth(struct drbd_conf *mdev);
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71{
72 struct drbd_epoch *prev;
73 spin_lock(&mdev->epoch_lock);
74 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 if (prev == epoch || prev == mdev->current_epoch)
76 prev = NULL;
77 spin_unlock(&mdev->epoch_lock);
78 return prev;
79}
80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
83static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
84{
85 struct page *page = NULL;
86
87 /* Yes, testing drbd_pp_vacant outside the lock is racy.
88 * So what. It saves a spin_lock. */
89 if (drbd_pp_vacant > 0) {
90 spin_lock(&drbd_pp_lock);
91 page = drbd_pp_pool;
92 if (page) {
93 drbd_pp_pool = (struct page *)page_private(page);
94 set_page_private(page, 0); /* just to be polite */
95 drbd_pp_vacant--;
96 }
97 spin_unlock(&drbd_pp_lock);
98 }
99 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100 * "criss-cross" setup, that might cause write-out on some other DRBD,
101 * which in turn might block on the other node at this very place. */
102 if (!page)
103 page = alloc_page(GFP_TRY);
104 if (page)
105 atomic_inc(&mdev->pp_in_use);
106 return page;
107}
108
109/* kick lower level device, if we have more than (arbitrary number)
110 * reference counts on it, which typically are locally submitted io
111 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
112static void maybe_kick_lo(struct drbd_conf *mdev)
113{
114 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
115 drbd_kick_lo(mdev);
116}
117
118static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
119{
120 struct drbd_epoch_entry *e;
121 struct list_head *le, *tle;
122
123 /* The EEs are always appended to the end of the list. Since
124 they are sent in order over the wire, they have to finish
125 in order. As soon as we see the first not finished we can
126 stop to examine the list... */
127
128 list_for_each_safe(le, tle, &mdev->net_ee) {
129 e = list_entry(le, struct drbd_epoch_entry, w.list);
130 if (drbd_bio_has_active_page(e->private_bio))
131 break;
132 list_move(le, to_be_freed);
133 }
134}
135
136static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
137{
138 LIST_HEAD(reclaimed);
139 struct drbd_epoch_entry *e, *t;
140
141 maybe_kick_lo(mdev);
142 spin_lock_irq(&mdev->req_lock);
143 reclaim_net_ee(mdev, &reclaimed);
144 spin_unlock_irq(&mdev->req_lock);
145
146 list_for_each_entry_safe(e, t, &reclaimed, w.list)
147 drbd_free_ee(mdev, e);
148}
149
150/**
151 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
152 * @mdev: DRBD device.
153 * @retry: whether or not to retry allocation forever (or until signalled)
154 *
155 * Tries to allocate a page, first from our own page pool, then from the
156 * kernel, unless this allocation would exceed the max_buffers setting.
157 * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158 */
159static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
160{
161 struct page *page = NULL;
162 DEFINE_WAIT(wait);
163
164 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
165 page = drbd_pp_first_page_or_try_alloc(mdev);
166 if (page)
167 return page;
168 }
169
170 for (;;) {
171 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172
173 drbd_kick_lo_and_reclaim_net(mdev);
174
175 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176 page = drbd_pp_first_page_or_try_alloc(mdev);
177 if (page)
178 break;
179 }
180
181 if (!retry)
182 break;
183
184 if (signal_pending(current)) {
185 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
186 break;
187 }
188
189 schedule();
190 }
191 finish_wait(&drbd_pp_wait, &wait);
192
193 return page;
194}
195
196/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
198static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199{
200 int free_it;
201
202 spin_lock(&drbd_pp_lock);
203 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204 free_it = 1;
205 } else {
206 set_page_private(page, (unsigned long)drbd_pp_pool);
207 drbd_pp_pool = page;
208 drbd_pp_vacant++;
209 free_it = 0;
210 }
211 spin_unlock(&drbd_pp_lock);
212
213 atomic_dec(&mdev->pp_in_use);
214
215 if (free_it)
216 __free_page(page);
217
218 wake_up(&drbd_pp_wait);
219}
220
221static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222{
223 struct page *p_to_be_freed = NULL;
224 struct page *page;
225 struct bio_vec *bvec;
226 int i;
227
228 spin_lock(&drbd_pp_lock);
229 __bio_for_each_segment(bvec, bio, i, 0) {
230 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
231 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
232 p_to_be_freed = bvec->bv_page;
233 } else {
234 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
235 drbd_pp_pool = bvec->bv_page;
236 drbd_pp_vacant++;
237 }
238 }
239 spin_unlock(&drbd_pp_lock);
240 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242 while (p_to_be_freed) {
243 page = p_to_be_freed;
244 p_to_be_freed = (struct page *)page_private(page);
245 set_page_private(page, 0); /* just to be polite */
246 put_page(page);
247 }
248
249 wake_up(&drbd_pp_wait);
250}
251
252/*
253You need to hold the req_lock:
254 _drbd_wait_ee_list_empty()
255
256You must not have the req_lock:
257 drbd_free_ee()
258 drbd_alloc_ee()
259 drbd_init_ee()
260 drbd_release_ee()
261 drbd_ee_fix_bhs()
262 drbd_process_done_ee()
263 drbd_clear_done_ee()
264 drbd_wait_ee_list_empty()
265*/
266
267struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
268 u64 id,
269 sector_t sector,
270 unsigned int data_size,
271 gfp_t gfp_mask) __must_hold(local)
272{
273 struct request_queue *q;
274 struct drbd_epoch_entry *e;
275 struct page *page;
276 struct bio *bio;
277 unsigned int ds;
278
279 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280 return NULL;
281
282 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
283 if (!e) {
284 if (!(gfp_mask & __GFP_NOWARN))
285 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
286 return NULL;
287 }
288
289 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
290 if (!bio) {
291 if (!(gfp_mask & __GFP_NOWARN))
292 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293 goto fail1;
294 }
295
296 bio->bi_bdev = mdev->ldev->backing_bdev;
297 bio->bi_sector = sector;
298
299 ds = data_size;
300 while (ds) {
301 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302 if (!page) {
303 if (!(gfp_mask & __GFP_NOWARN))
304 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305 goto fail2;
306 }
307 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308 drbd_pp_free(mdev, page);
309 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310 "data_size=%u,ds=%u) failed\n",
311 (unsigned long long)sector, data_size, ds);
312
313 q = bdev_get_queue(bio->bi_bdev);
314 if (q->merge_bvec_fn) {
315 struct bvec_merge_data bvm = {
316 .bi_bdev = bio->bi_bdev,
317 .bi_sector = bio->bi_sector,
318 .bi_size = bio->bi_size,
319 .bi_rw = bio->bi_rw,
320 };
321 int l = q->merge_bvec_fn(q, &bvm,
322 &bio->bi_io_vec[bio->bi_vcnt]);
323 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324 }
325
326 /* dump more of the bio. */
327 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332 goto fail2;
333 break;
334 }
335 ds -= min_t(int, ds, PAGE_SIZE);
336 }
337
338 D_ASSERT(data_size == bio->bi_size);
339
340 bio->bi_private = e;
341 e->mdev = mdev;
342 e->sector = sector;
343 e->size = bio->bi_size;
344
345 e->private_bio = bio;
346 e->block_id = id;
347 INIT_HLIST_NODE(&e->colision);
348 e->epoch = NULL;
349 e->flags = 0;
350
Philipp Reisnerb411b362009-09-25 16:07:19 -0700351 return e;
352
353 fail2:
354 drbd_pp_free_bio_pages(mdev, bio);
355 bio_put(bio);
356 fail1:
357 mempool_free(e, drbd_ee_mempool);
358
359 return NULL;
360}
361
362void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363{
364 struct bio *bio = e->private_bio;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700365 drbd_pp_free_bio_pages(mdev, bio);
366 bio_put(bio);
367 D_ASSERT(hlist_unhashed(&e->colision));
368 mempool_free(e, drbd_ee_mempool);
369}
370
371int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
372{
373 LIST_HEAD(work_list);
374 struct drbd_epoch_entry *e, *t;
375 int count = 0;
376
377 spin_lock_irq(&mdev->req_lock);
378 list_splice_init(list, &work_list);
379 spin_unlock_irq(&mdev->req_lock);
380
381 list_for_each_entry_safe(e, t, &work_list, w.list) {
382 drbd_free_ee(mdev, e);
383 count++;
384 }
385 return count;
386}
387
388
389/*
390 * This function is called from _asender only_
391 * but see also comments in _req_mod(,barrier_acked)
392 * and receive_Barrier.
393 *
394 * Move entries from net_ee to done_ee, if ready.
395 * Grab done_ee, call all callbacks, free the entries.
396 * The callbacks typically send out ACKs.
397 */
398static int drbd_process_done_ee(struct drbd_conf *mdev)
399{
400 LIST_HEAD(work_list);
401 LIST_HEAD(reclaimed);
402 struct drbd_epoch_entry *e, *t;
403 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
404
405 spin_lock_irq(&mdev->req_lock);
406 reclaim_net_ee(mdev, &reclaimed);
407 list_splice_init(&mdev->done_ee, &work_list);
408 spin_unlock_irq(&mdev->req_lock);
409
410 list_for_each_entry_safe(e, t, &reclaimed, w.list)
411 drbd_free_ee(mdev, e);
412
413 /* possible callbacks here:
414 * e_end_block, and e_end_resync_block, e_send_discard_ack.
415 * all ignore the last argument.
416 */
417 list_for_each_entry_safe(e, t, &work_list, w.list) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700418 /* list_del not necessary, next/prev members not touched */
419 ok = e->w.cb(mdev, &e->w, !ok) && ok;
420 drbd_free_ee(mdev, e);
421 }
422 wake_up(&mdev->ee_wait);
423
424 return ok;
425}
426
427void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
428{
429 DEFINE_WAIT(wait);
430
431 /* avoids spin_lock/unlock
432 * and calling prepare_to_wait in the fast path */
433 while (!list_empty(head)) {
434 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
435 spin_unlock_irq(&mdev->req_lock);
436 drbd_kick_lo(mdev);
437 schedule();
438 finish_wait(&mdev->ee_wait, &wait);
439 spin_lock_irq(&mdev->req_lock);
440 }
441}
442
443void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444{
445 spin_lock_irq(&mdev->req_lock);
446 _drbd_wait_ee_list_empty(mdev, head);
447 spin_unlock_irq(&mdev->req_lock);
448}
449
450/* see also kernel_accept; which is only present since 2.6.18.
451 * also we want to log which part of it failed, exactly */
452static int drbd_accept(struct drbd_conf *mdev, const char **what,
453 struct socket *sock, struct socket **newsock)
454{
455 struct sock *sk = sock->sk;
456 int err = 0;
457
458 *what = "listen";
459 err = sock->ops->listen(sock, 5);
460 if (err < 0)
461 goto out;
462
463 *what = "sock_create_lite";
464 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
465 newsock);
466 if (err < 0)
467 goto out;
468
469 *what = "accept";
470 err = sock->ops->accept(sock, *newsock, 0);
471 if (err < 0) {
472 sock_release(*newsock);
473 *newsock = NULL;
474 goto out;
475 }
476 (*newsock)->ops = sock->ops;
477
478out:
479 return err;
480}
481
482static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
483 void *buf, size_t size, int flags)
484{
485 mm_segment_t oldfs;
486 struct kvec iov = {
487 .iov_base = buf,
488 .iov_len = size,
489 };
490 struct msghdr msg = {
491 .msg_iovlen = 1,
492 .msg_iov = (struct iovec *)&iov,
493 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
494 };
495 int rv;
496
497 oldfs = get_fs();
498 set_fs(KERNEL_DS);
499 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
500 set_fs(oldfs);
501
502 return rv;
503}
504
505static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
506{
507 mm_segment_t oldfs;
508 struct kvec iov = {
509 .iov_base = buf,
510 .iov_len = size,
511 };
512 struct msghdr msg = {
513 .msg_iovlen = 1,
514 .msg_iov = (struct iovec *)&iov,
515 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
516 };
517 int rv;
518
519 oldfs = get_fs();
520 set_fs(KERNEL_DS);
521
522 for (;;) {
523 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
524 if (rv == size)
525 break;
526
527 /* Note:
528 * ECONNRESET other side closed the connection
529 * ERESTARTSYS (on sock) we got a signal
530 */
531
532 if (rv < 0) {
533 if (rv == -ECONNRESET)
534 dev_info(DEV, "sock was reset by peer\n");
535 else if (rv != -ERESTARTSYS)
536 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
537 break;
538 } else if (rv == 0) {
539 dev_info(DEV, "sock was shut down by peer\n");
540 break;
541 } else {
542 /* signal came in, or peer/link went down,
543 * after we read a partial message
544 */
545 /* D_ASSERT(signal_pending(current)); */
546 break;
547 }
548 };
549
550 set_fs(oldfs);
551
552 if (rv != size)
553 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
554
555 return rv;
556}
557
558static struct socket *drbd_try_connect(struct drbd_conf *mdev)
559{
560 const char *what;
561 struct socket *sock;
562 struct sockaddr_in6 src_in6;
563 int err;
564 int disconnect_on_error = 1;
565
566 if (!get_net_conf(mdev))
567 return NULL;
568
569 what = "sock_create_kern";
570 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
571 SOCK_STREAM, IPPROTO_TCP, &sock);
572 if (err < 0) {
573 sock = NULL;
574 goto out;
575 }
576
577 sock->sk->sk_rcvtimeo =
578 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
579
580 /* explicitly bind to the configured IP as source IP
581 * for the outgoing connections.
582 * This is needed for multihomed hosts and to be
583 * able to use lo: interfaces for drbd.
584 * Make sure to use 0 as port number, so linux selects
585 * a free one dynamically.
586 */
587 memcpy(&src_in6, mdev->net_conf->my_addr,
588 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
589 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
590 src_in6.sin6_port = 0;
591 else
592 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
593
594 what = "bind before connect";
595 err = sock->ops->bind(sock,
596 (struct sockaddr *) &src_in6,
597 mdev->net_conf->my_addr_len);
598 if (err < 0)
599 goto out;
600
601 /* connect may fail, peer not yet available.
602 * stay C_WF_CONNECTION, don't go Disconnecting! */
603 disconnect_on_error = 0;
604 what = "connect";
605 err = sock->ops->connect(sock,
606 (struct sockaddr *)mdev->net_conf->peer_addr,
607 mdev->net_conf->peer_addr_len, 0);
608
609out:
610 if (err < 0) {
611 if (sock) {
612 sock_release(sock);
613 sock = NULL;
614 }
615 switch (-err) {
616 /* timeout, busy, signal pending */
617 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
618 case EINTR: case ERESTARTSYS:
619 /* peer not (yet) available, network problem */
620 case ECONNREFUSED: case ENETUNREACH:
621 case EHOSTDOWN: case EHOSTUNREACH:
622 disconnect_on_error = 0;
623 break;
624 default:
625 dev_err(DEV, "%s failed, err = %d\n", what, err);
626 }
627 if (disconnect_on_error)
628 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
629 }
630 put_net_conf(mdev);
631 return sock;
632}
633
634static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
635{
636 int timeo, err;
637 struct socket *s_estab = NULL, *s_listen;
638 const char *what;
639
640 if (!get_net_conf(mdev))
641 return NULL;
642
643 what = "sock_create_kern";
644 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
645 SOCK_STREAM, IPPROTO_TCP, &s_listen);
646 if (err) {
647 s_listen = NULL;
648 goto out;
649 }
650
651 timeo = mdev->net_conf->try_connect_int * HZ;
652 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
653
654 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
655 s_listen->sk->sk_rcvtimeo = timeo;
656 s_listen->sk->sk_sndtimeo = timeo;
657
658 what = "bind before listen";
659 err = s_listen->ops->bind(s_listen,
660 (struct sockaddr *) mdev->net_conf->my_addr,
661 mdev->net_conf->my_addr_len);
662 if (err < 0)
663 goto out;
664
665 err = drbd_accept(mdev, &what, s_listen, &s_estab);
666
667out:
668 if (s_listen)
669 sock_release(s_listen);
670 if (err < 0) {
671 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
672 dev_err(DEV, "%s failed, err = %d\n", what, err);
673 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
674 }
675 }
676 put_net_conf(mdev);
677
678 return s_estab;
679}
680
681static int drbd_send_fp(struct drbd_conf *mdev,
682 struct socket *sock, enum drbd_packets cmd)
683{
684 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
685
686 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
687}
688
689static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
690{
691 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692 int rr;
693
694 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
695
696 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
697 return be16_to_cpu(h->command);
698
699 return 0xffff;
700}
701
702/**
703 * drbd_socket_okay() - Free the socket if its connection is not okay
704 * @mdev: DRBD device.
705 * @sock: pointer to the pointer to the socket.
706 */
707static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
708{
709 int rr;
710 char tb[4];
711
712 if (!*sock)
713 return FALSE;
714
715 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
716
717 if (rr > 0 || rr == -EAGAIN) {
718 return TRUE;
719 } else {
720 sock_release(*sock);
721 *sock = NULL;
722 return FALSE;
723 }
724}
725
726/*
727 * return values:
728 * 1 yes, we have a valid connection
729 * 0 oops, did not work out, please try again
730 * -1 peer talks different language,
731 * no point in trying again, please go standalone.
732 * -2 We do not have a network config...
733 */
734static int drbd_connect(struct drbd_conf *mdev)
735{
736 struct socket *s, *sock, *msock;
737 int try, h, ok;
738
739 D_ASSERT(!mdev->data.socket);
740
741 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
742 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
743
744 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
745 return -2;
746
747 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
748
749 sock = NULL;
750 msock = NULL;
751
752 do {
753 for (try = 0;;) {
754 /* 3 tries, this should take less than a second! */
755 s = drbd_try_connect(mdev);
756 if (s || ++try >= 3)
757 break;
758 /* give the other side time to call bind() & listen() */
759 __set_current_state(TASK_INTERRUPTIBLE);
760 schedule_timeout(HZ / 10);
761 }
762
763 if (s) {
764 if (!sock) {
765 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
766 sock = s;
767 s = NULL;
768 } else if (!msock) {
769 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
770 msock = s;
771 s = NULL;
772 } else {
773 dev_err(DEV, "Logic error in drbd_connect()\n");
774 goto out_release_sockets;
775 }
776 }
777
778 if (sock && msock) {
779 __set_current_state(TASK_INTERRUPTIBLE);
780 schedule_timeout(HZ / 10);
781 ok = drbd_socket_okay(mdev, &sock);
782 ok = drbd_socket_okay(mdev, &msock) && ok;
783 if (ok)
784 break;
785 }
786
787retry:
788 s = drbd_wait_for_connect(mdev);
789 if (s) {
790 try = drbd_recv_fp(mdev, s);
791 drbd_socket_okay(mdev, &sock);
792 drbd_socket_okay(mdev, &msock);
793 switch (try) {
794 case P_HAND_SHAKE_S:
795 if (sock) {
796 dev_warn(DEV, "initial packet S crossed\n");
797 sock_release(sock);
798 }
799 sock = s;
800 break;
801 case P_HAND_SHAKE_M:
802 if (msock) {
803 dev_warn(DEV, "initial packet M crossed\n");
804 sock_release(msock);
805 }
806 msock = s;
807 set_bit(DISCARD_CONCURRENT, &mdev->flags);
808 break;
809 default:
810 dev_warn(DEV, "Error receiving initial packet\n");
811 sock_release(s);
812 if (random32() & 1)
813 goto retry;
814 }
815 }
816
817 if (mdev->state.conn <= C_DISCONNECTING)
818 goto out_release_sockets;
819 if (signal_pending(current)) {
820 flush_signals(current);
821 smp_rmb();
822 if (get_t_state(&mdev->receiver) == Exiting)
823 goto out_release_sockets;
824 }
825
826 if (sock && msock) {
827 ok = drbd_socket_okay(mdev, &sock);
828 ok = drbd_socket_okay(mdev, &msock) && ok;
829 if (ok)
830 break;
831 }
832 } while (1);
833
834 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
835 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836
837 sock->sk->sk_allocation = GFP_NOIO;
838 msock->sk->sk_allocation = GFP_NOIO;
839
840 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
841 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
842
843 if (mdev->net_conf->sndbuf_size) {
844 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
845 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
846 }
847
848 if (mdev->net_conf->rcvbuf_size) {
849 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
850 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
851 }
852
853 /* NOT YET ...
854 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856 * first set it to the P_HAND_SHAKE timeout,
857 * which we set to 4x the configured ping_timeout. */
858 sock->sk->sk_sndtimeo =
859 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864 /* we don't want delays.
865 * we use TCP_CORK where apropriate, though */
866 drbd_tcp_nodelay(sock);
867 drbd_tcp_nodelay(msock);
868
869 mdev->data.socket = sock;
870 mdev->meta.socket = msock;
871 mdev->last_received = jiffies;
872
873 D_ASSERT(mdev->asender.task == NULL);
874
875 h = drbd_do_handshake(mdev);
876 if (h <= 0)
877 return h;
878
879 if (mdev->cram_hmac_tfm) {
880 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
Johannes Thomab10d96c2010-01-07 16:02:50 +0100881 switch (drbd_do_auth(mdev)) {
882 case -1:
Philipp Reisnerb411b362009-09-25 16:07:19 -0700883 dev_err(DEV, "Authentication of peer failed\n");
884 return -1;
Johannes Thomab10d96c2010-01-07 16:02:50 +0100885 case 0:
886 dev_err(DEV, "Authentication of peer failed, trying again.\n");
887 return 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700888 }
889 }
890
891 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892 return 0;
893
894 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896
897 atomic_set(&mdev->packet_seq, 0);
898 mdev->peer_seq = 0;
899
900 drbd_thread_start(&mdev->asender);
901
Philipp Reisner7e2455c2010-04-22 14:50:23 +0200902 if (!drbd_send_protocol(mdev))
903 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700904 drbd_send_sync_param(mdev, &mdev->sync_conf);
905 drbd_send_sizes(mdev, 0);
906 drbd_send_uuids(mdev);
907 drbd_send_state(mdev);
908 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
909 clear_bit(RESIZE_PENDING, &mdev->flags);
910
911 return 1;
912
913out_release_sockets:
914 if (sock)
915 sock_release(sock);
916 if (msock)
917 sock_release(msock);
918 return -1;
919}
920
921static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
922{
923 int r;
924
925 r = drbd_recv(mdev, h, sizeof(*h));
926
927 if (unlikely(r != sizeof(*h))) {
928 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
929 return FALSE;
930 };
931 h->command = be16_to_cpu(h->command);
932 h->length = be16_to_cpu(h->length);
933 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
934 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
935 (long)be32_to_cpu(h->magic),
936 h->command, h->length);
937 return FALSE;
938 }
939 mdev->last_received = jiffies;
940
941 return TRUE;
942}
943
944static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
945{
946 int rv;
947
948 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
Dmitry Monakhovfbd9b092010-04-28 17:55:06 +0400949 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
950 NULL, BLKDEV_IFL_WAIT);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700951 if (rv) {
952 dev_err(DEV, "local disk flush failed with status %d\n", rv);
953 /* would rather check on EOPNOTSUPP, but that is not reliable.
954 * don't try again for ANY return value != 0
955 * if (rv == -EOPNOTSUPP) */
956 drbd_bump_write_ordering(mdev, WO_drain_io);
957 }
958 put_ldev(mdev);
959 }
960
961 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
962}
963
964static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
965{
966 struct flush_work *fw = (struct flush_work *)w;
967 struct drbd_epoch *epoch = fw->epoch;
968
969 kfree(w);
970
971 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
972 drbd_flush_after_epoch(mdev, epoch);
973
974 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
975 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
976
977 return 1;
978}
979
980/**
981 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
982 * @mdev: DRBD device.
983 * @epoch: Epoch object.
984 * @ev: Epoch event.
985 */
986static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
987 struct drbd_epoch *epoch,
988 enum epoch_event ev)
989{
990 int finish, epoch_size;
991 struct drbd_epoch *next_epoch;
992 int schedule_flush = 0;
993 enum finish_epoch rv = FE_STILL_LIVE;
994
995 spin_lock(&mdev->epoch_lock);
996 do {
997 next_epoch = NULL;
998 finish = 0;
999
1000 epoch_size = atomic_read(&epoch->epoch_size);
1001
1002 switch (ev & ~EV_CLEANUP) {
1003 case EV_PUT:
1004 atomic_dec(&epoch->active);
1005 break;
1006 case EV_GOT_BARRIER_NR:
1007 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1008
1009 /* Special case: If we just switched from WO_bio_barrier to
1010 WO_bdev_flush we should not finish the current epoch */
1011 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1012 mdev->write_ordering != WO_bio_barrier &&
1013 epoch == mdev->current_epoch)
1014 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1015 break;
1016 case EV_BARRIER_DONE:
1017 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1018 break;
1019 case EV_BECAME_LAST:
1020 /* nothing to do*/
1021 break;
1022 }
1023
Philipp Reisnerb411b362009-09-25 16:07:19 -07001024 if (epoch_size != 0 &&
1025 atomic_read(&epoch->active) == 0 &&
1026 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1027 epoch->list.prev == &mdev->current_epoch->list &&
1028 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1029 /* Nearly all conditions are met to finish that epoch... */
1030 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1031 mdev->write_ordering == WO_none ||
1032 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1033 ev & EV_CLEANUP) {
1034 finish = 1;
1035 set_bit(DE_IS_FINISHING, &epoch->flags);
1036 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1037 mdev->write_ordering == WO_bio_barrier) {
1038 atomic_inc(&epoch->active);
1039 schedule_flush = 1;
1040 }
1041 }
1042 if (finish) {
1043 if (!(ev & EV_CLEANUP)) {
1044 spin_unlock(&mdev->epoch_lock);
1045 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1046 spin_lock(&mdev->epoch_lock);
1047 }
1048 dec_unacked(mdev);
1049
1050 if (mdev->current_epoch != epoch) {
1051 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1052 list_del(&epoch->list);
1053 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1054 mdev->epochs--;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001055 kfree(epoch);
1056
1057 if (rv == FE_STILL_LIVE)
1058 rv = FE_DESTROYED;
1059 } else {
1060 epoch->flags = 0;
1061 atomic_set(&epoch->epoch_size, 0);
1062 /* atomic_set(&epoch->active, 0); is alrady zero */
1063 if (rv == FE_STILL_LIVE)
1064 rv = FE_RECYCLED;
1065 }
1066 }
1067
1068 if (!next_epoch)
1069 break;
1070
1071 epoch = next_epoch;
1072 } while (1);
1073
1074 spin_unlock(&mdev->epoch_lock);
1075
1076 if (schedule_flush) {
1077 struct flush_work *fw;
1078 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1079 if (fw) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001080 fw->w.cb = w_flush;
1081 fw->epoch = epoch;
1082 drbd_queue_work(&mdev->data.work, &fw->w);
1083 } else {
1084 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1085 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1086 /* That is not a recursion, only one level */
1087 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1088 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1089 }
1090 }
1091
1092 return rv;
1093}
1094
1095/**
1096 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1097 * @mdev: DRBD device.
1098 * @wo: Write ordering method to try.
1099 */
1100void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1101{
1102 enum write_ordering_e pwo;
1103 static char *write_ordering_str[] = {
1104 [WO_none] = "none",
1105 [WO_drain_io] = "drain",
1106 [WO_bdev_flush] = "flush",
1107 [WO_bio_barrier] = "barrier",
1108 };
1109
1110 pwo = mdev->write_ordering;
1111 wo = min(pwo, wo);
1112 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1113 wo = WO_bdev_flush;
1114 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1115 wo = WO_drain_io;
1116 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1117 wo = WO_none;
1118 mdev->write_ordering = wo;
1119 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1120 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1121}
1122
1123/**
1124 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1125 * @mdev: DRBD device.
1126 * @w: work object.
1127 * @cancel: The connection will be closed anyways (unused in this callback)
1128 */
1129int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1130{
1131 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1132 struct bio *bio = e->private_bio;
1133
1134 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1135 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1136 so that we can finish that epoch in drbd_may_finish_epoch().
1137 That is necessary if we already have a long chain of Epochs, before
1138 we realize that BIO_RW_BARRIER is actually not supported */
1139
1140 /* As long as the -ENOTSUPP on the barrier is reported immediately
1141 that will never trigger. If it is reported late, we will just
1142 print that warning and continue correctly for all future requests
1143 with WO_bdev_flush */
1144 if (previous_epoch(mdev, e->epoch))
1145 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1146
1147 /* prepare bio for re-submit,
1148 * re-init volatile members */
1149 /* we still have a local reference,
1150 * get_ldev was done in receive_Data. */
1151 bio->bi_bdev = mdev->ldev->backing_bdev;
1152 bio->bi_sector = e->sector;
1153 bio->bi_size = e->size;
1154 bio->bi_idx = 0;
1155
1156 bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1157 bio->bi_flags |= 1 << BIO_UPTODATE;
1158
1159 /* don't know whether this is necessary: */
1160 bio->bi_phys_segments = 0;
1161 bio->bi_next = NULL;
1162
1163 /* these should be unchanged: */
1164 /* bio->bi_end_io = drbd_endio_write_sec; */
1165 /* bio->bi_vcnt = whatever; */
1166
1167 e->w.cb = e_end_block;
1168
1169 /* This is no longer a barrier request. */
1170 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1171
1172 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1173
1174 return 1;
1175}
1176
1177static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1178{
1179 int rv, issue_flush;
1180 struct p_barrier *p = (struct p_barrier *)h;
1181 struct drbd_epoch *epoch;
1182
1183 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1184
1185 rv = drbd_recv(mdev, h->payload, h->length);
1186 ERR_IF(rv != h->length) return FALSE;
1187
1188 inc_unacked(mdev);
1189
1190 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1191 drbd_kick_lo(mdev);
1192
1193 mdev->current_epoch->barrier_nr = p->barrier;
1194 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1195
1196 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1197 * the activity log, which means it would not be resynced in case the
1198 * R_PRIMARY crashes now.
1199 * Therefore we must send the barrier_ack after the barrier request was
1200 * completed. */
1201 switch (mdev->write_ordering) {
1202 case WO_bio_barrier:
1203 case WO_none:
1204 if (rv == FE_RECYCLED)
1205 return TRUE;
1206 break;
1207
1208 case WO_bdev_flush:
1209 case WO_drain_io:
Philipp Reisner367a8d72009-12-29 15:56:01 +01001210 if (rv == FE_STILL_LIVE) {
1211 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1212 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1213 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1214 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001215 if (rv == FE_RECYCLED)
1216 return TRUE;
1217
1218 /* The asender will send all the ACKs and barrier ACKs out, since
1219 all EEs moved from the active_ee to the done_ee. We need to
1220 provide a new epoch object for the EEs that come in soon */
1221 break;
1222 }
1223
1224 /* receiver context, in the writeout path of the other node.
1225 * avoid potential distributed deadlock */
1226 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1227 if (!epoch) {
1228 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
Dan Carpenterd3db7b42010-01-23 15:45:22 +03001229 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001230 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1231 if (issue_flush) {
1232 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1233 if (rv == FE_RECYCLED)
1234 return TRUE;
1235 }
1236
1237 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1238
1239 return TRUE;
1240 }
1241
1242 epoch->flags = 0;
1243 atomic_set(&epoch->epoch_size, 0);
1244 atomic_set(&epoch->active, 0);
1245
1246 spin_lock(&mdev->epoch_lock);
1247 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1248 list_add(&epoch->list, &mdev->current_epoch->list);
1249 mdev->current_epoch = epoch;
1250 mdev->epochs++;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001251 } else {
1252 /* The current_epoch got recycled while we allocated this one... */
1253 kfree(epoch);
1254 }
1255 spin_unlock(&mdev->epoch_lock);
1256
1257 return TRUE;
1258}
1259
1260/* used from receive_RSDataReply (recv_resync_read)
1261 * and from receive_Data */
1262static struct drbd_epoch_entry *
1263read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1264{
1265 struct drbd_epoch_entry *e;
1266 struct bio_vec *bvec;
1267 struct page *page;
1268 struct bio *bio;
1269 int dgs, ds, i, rr;
1270 void *dig_in = mdev->int_dig_in;
1271 void *dig_vv = mdev->int_dig_vv;
1272
1273 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1274 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1275
1276 if (dgs) {
1277 rr = drbd_recv(mdev, dig_in, dgs);
1278 if (rr != dgs) {
1279 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1280 rr, dgs);
1281 return NULL;
1282 }
1283 }
1284
1285 data_size -= dgs;
1286
1287 ERR_IF(data_size & 0x1ff) return NULL;
1288 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1289
1290 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1291 * "criss-cross" setup, that might cause write-out on some other DRBD,
1292 * which in turn might block on the other node at this very place. */
1293 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1294 if (!e)
1295 return NULL;
1296 bio = e->private_bio;
1297 ds = data_size;
1298 bio_for_each_segment(bvec, bio, i) {
1299 page = bvec->bv_page;
1300 rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1301 kunmap(page);
1302 if (rr != min_t(int, ds, PAGE_SIZE)) {
1303 drbd_free_ee(mdev, e);
1304 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1305 rr, min_t(int, ds, PAGE_SIZE));
1306 return NULL;
1307 }
1308 ds -= rr;
1309 }
1310
1311 if (dgs) {
1312 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1313 if (memcmp(dig_in, dig_vv, dgs)) {
1314 dev_err(DEV, "Digest integrity check FAILED.\n");
1315 drbd_bcast_ee(mdev, "digest failed",
1316 dgs, dig_in, dig_vv, e);
1317 drbd_free_ee(mdev, e);
1318 return NULL;
1319 }
1320 }
1321 mdev->recv_cnt += data_size>>9;
1322 return e;
1323}
1324
1325/* drbd_drain_block() just takes a data block
1326 * out of the socket input buffer, and discards it.
1327 */
1328static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1329{
1330 struct page *page;
1331 int rr, rv = 1;
1332 void *data;
1333
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02001334 if (!data_size)
1335 return TRUE;
1336
Philipp Reisnerb411b362009-09-25 16:07:19 -07001337 page = drbd_pp_alloc(mdev, 1);
1338
1339 data = kmap(page);
1340 while (data_size) {
1341 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1342 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1343 rv = 0;
1344 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1345 rr, min_t(int, data_size, PAGE_SIZE));
1346 break;
1347 }
1348 data_size -= rr;
1349 }
1350 kunmap(page);
1351 drbd_pp_free(mdev, page);
1352 return rv;
1353}
1354
1355static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1356 sector_t sector, int data_size)
1357{
1358 struct bio_vec *bvec;
1359 struct bio *bio;
1360 int dgs, rr, i, expect;
1361 void *dig_in = mdev->int_dig_in;
1362 void *dig_vv = mdev->int_dig_vv;
1363
1364 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1365 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1366
1367 if (dgs) {
1368 rr = drbd_recv(mdev, dig_in, dgs);
1369 if (rr != dgs) {
1370 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1371 rr, dgs);
1372 return 0;
1373 }
1374 }
1375
1376 data_size -= dgs;
1377
1378 /* optimistically update recv_cnt. if receiving fails below,
1379 * we disconnect anyways, and counters will be reset. */
1380 mdev->recv_cnt += data_size>>9;
1381
1382 bio = req->master_bio;
1383 D_ASSERT(sector == bio->bi_sector);
1384
1385 bio_for_each_segment(bvec, bio, i) {
1386 expect = min_t(int, data_size, bvec->bv_len);
1387 rr = drbd_recv(mdev,
1388 kmap(bvec->bv_page)+bvec->bv_offset,
1389 expect);
1390 kunmap(bvec->bv_page);
1391 if (rr != expect) {
1392 dev_warn(DEV, "short read receiving data reply: "
1393 "read %d expected %d\n",
1394 rr, expect);
1395 return 0;
1396 }
1397 data_size -= rr;
1398 }
1399
1400 if (dgs) {
1401 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1402 if (memcmp(dig_in, dig_vv, dgs)) {
1403 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1404 return 0;
1405 }
1406 }
1407
1408 D_ASSERT(data_size == 0);
1409 return 1;
1410}
1411
1412/* e_end_resync_block() is called via
1413 * drbd_process_done_ee() by asender only */
1414static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1415{
1416 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1417 sector_t sector = e->sector;
1418 int ok;
1419
1420 D_ASSERT(hlist_unhashed(&e->colision));
1421
1422 if (likely(drbd_bio_uptodate(e->private_bio))) {
1423 drbd_set_in_sync(mdev, sector, e->size);
1424 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1425 } else {
1426 /* Record failure to sync */
1427 drbd_rs_failed_io(mdev, sector, e->size);
1428
1429 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1430 }
1431 dec_unacked(mdev);
1432
1433 return ok;
1434}
1435
1436static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1437{
1438 struct drbd_epoch_entry *e;
1439
1440 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1441 if (!e) {
1442 put_ldev(mdev);
1443 return FALSE;
1444 }
1445
1446 dec_rs_pending(mdev);
1447
1448 e->private_bio->bi_end_io = drbd_endio_write_sec;
1449 e->private_bio->bi_rw = WRITE;
1450 e->w.cb = e_end_resync_block;
1451
1452 inc_unacked(mdev);
1453 /* corresponding dec_unacked() in e_end_resync_block()
1454 * respective _drbd_clear_done_ee */
1455
1456 spin_lock_irq(&mdev->req_lock);
1457 list_add(&e->w.list, &mdev->sync_ee);
1458 spin_unlock_irq(&mdev->req_lock);
1459
Philipp Reisnerb411b362009-09-25 16:07:19 -07001460 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1461 /* accounting done in endio */
1462
1463 maybe_kick_lo(mdev);
1464 return TRUE;
1465}
1466
1467static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1468{
1469 struct drbd_request *req;
1470 sector_t sector;
1471 unsigned int header_size, data_size;
1472 int ok;
1473 struct p_data *p = (struct p_data *)h;
1474
1475 header_size = sizeof(*p) - sizeof(*h);
1476 data_size = h->length - header_size;
1477
1478 ERR_IF(data_size == 0) return FALSE;
1479
1480 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1481 return FALSE;
1482
1483 sector = be64_to_cpu(p->sector);
1484
1485 spin_lock_irq(&mdev->req_lock);
1486 req = _ar_id_to_req(mdev, p->block_id, sector);
1487 spin_unlock_irq(&mdev->req_lock);
1488 if (unlikely(!req)) {
1489 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1490 return FALSE;
1491 }
1492
1493 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1494 * special casing it there for the various failure cases.
1495 * still no race with drbd_fail_pending_reads */
1496 ok = recv_dless_read(mdev, req, sector, data_size);
1497
1498 if (ok)
1499 req_mod(req, data_received);
1500 /* else: nothing. handled from drbd_disconnect...
1501 * I don't think we may complete this just yet
1502 * in case we are "on-disconnect: freeze" */
1503
1504 return ok;
1505}
1506
1507static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1508{
1509 sector_t sector;
1510 unsigned int header_size, data_size;
1511 int ok;
1512 struct p_data *p = (struct p_data *)h;
1513
1514 header_size = sizeof(*p) - sizeof(*h);
1515 data_size = h->length - header_size;
1516
1517 ERR_IF(data_size == 0) return FALSE;
1518
1519 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1520 return FALSE;
1521
1522 sector = be64_to_cpu(p->sector);
1523 D_ASSERT(p->block_id == ID_SYNCER);
1524
1525 if (get_ldev(mdev)) {
1526 /* data is submitted to disk within recv_resync_read.
1527 * corresponding put_ldev done below on error,
1528 * or in drbd_endio_write_sec. */
1529 ok = recv_resync_read(mdev, sector, data_size);
1530 } else {
1531 if (__ratelimit(&drbd_ratelimit_state))
1532 dev_err(DEV, "Can not write resync data to local disk.\n");
1533
1534 ok = drbd_drain_block(mdev, data_size);
1535
1536 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1537 }
1538
1539 return ok;
1540}
1541
1542/* e_end_block() is called via drbd_process_done_ee().
1543 * this means this function only runs in the asender thread
1544 */
1545static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1546{
1547 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1548 sector_t sector = e->sector;
1549 struct drbd_epoch *epoch;
1550 int ok = 1, pcmd;
1551
1552 if (e->flags & EE_IS_BARRIER) {
1553 epoch = previous_epoch(mdev, e->epoch);
1554 if (epoch)
1555 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1556 }
1557
1558 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1559 if (likely(drbd_bio_uptodate(e->private_bio))) {
1560 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1561 mdev->state.conn <= C_PAUSED_SYNC_T &&
1562 e->flags & EE_MAY_SET_IN_SYNC) ?
1563 P_RS_WRITE_ACK : P_WRITE_ACK;
1564 ok &= drbd_send_ack(mdev, pcmd, e);
1565 if (pcmd == P_RS_WRITE_ACK)
1566 drbd_set_in_sync(mdev, sector, e->size);
1567 } else {
1568 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1569 /* we expect it to be marked out of sync anyways...
1570 * maybe assert this? */
1571 }
1572 dec_unacked(mdev);
1573 }
1574 /* we delete from the conflict detection hash _after_ we sent out the
1575 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1576 if (mdev->net_conf->two_primaries) {
1577 spin_lock_irq(&mdev->req_lock);
1578 D_ASSERT(!hlist_unhashed(&e->colision));
1579 hlist_del_init(&e->colision);
1580 spin_unlock_irq(&mdev->req_lock);
1581 } else {
1582 D_ASSERT(hlist_unhashed(&e->colision));
1583 }
1584
1585 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1586
1587 return ok;
1588}
1589
1590static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1591{
1592 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1593 int ok = 1;
1594
1595 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1596 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1597
1598 spin_lock_irq(&mdev->req_lock);
1599 D_ASSERT(!hlist_unhashed(&e->colision));
1600 hlist_del_init(&e->colision);
1601 spin_unlock_irq(&mdev->req_lock);
1602
1603 dec_unacked(mdev);
1604
1605 return ok;
1606}
1607
1608/* Called from receive_Data.
1609 * Synchronize packets on sock with packets on msock.
1610 *
1611 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1612 * packet traveling on msock, they are still processed in the order they have
1613 * been sent.
1614 *
1615 * Note: we don't care for Ack packets overtaking P_DATA packets.
1616 *
1617 * In case packet_seq is larger than mdev->peer_seq number, there are
1618 * outstanding packets on the msock. We wait for them to arrive.
1619 * In case we are the logically next packet, we update mdev->peer_seq
1620 * ourselves. Correctly handles 32bit wrap around.
1621 *
1622 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1623 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1624 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1625 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1626 *
1627 * returns 0 if we may process the packet,
1628 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1629static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1630{
1631 DEFINE_WAIT(wait);
1632 unsigned int p_seq;
1633 long timeout;
1634 int ret = 0;
1635 spin_lock(&mdev->peer_seq_lock);
1636 for (;;) {
1637 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1638 if (seq_le(packet_seq, mdev->peer_seq+1))
1639 break;
1640 if (signal_pending(current)) {
1641 ret = -ERESTARTSYS;
1642 break;
1643 }
1644 p_seq = mdev->peer_seq;
1645 spin_unlock(&mdev->peer_seq_lock);
1646 timeout = schedule_timeout(30*HZ);
1647 spin_lock(&mdev->peer_seq_lock);
1648 if (timeout == 0 && p_seq == mdev->peer_seq) {
1649 ret = -ETIMEDOUT;
1650 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1651 break;
1652 }
1653 }
1654 finish_wait(&mdev->seq_wait, &wait);
1655 if (mdev->peer_seq+1 == packet_seq)
1656 mdev->peer_seq++;
1657 spin_unlock(&mdev->peer_seq_lock);
1658 return ret;
1659}
1660
1661/* mirrored write */
1662static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1663{
1664 sector_t sector;
1665 struct drbd_epoch_entry *e;
1666 struct p_data *p = (struct p_data *)h;
1667 int header_size, data_size;
1668 int rw = WRITE;
1669 u32 dp_flags;
1670
1671 header_size = sizeof(*p) - sizeof(*h);
1672 data_size = h->length - header_size;
1673
1674 ERR_IF(data_size == 0) return FALSE;
1675
1676 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1677 return FALSE;
1678
1679 if (!get_ldev(mdev)) {
1680 if (__ratelimit(&drbd_ratelimit_state))
1681 dev_err(DEV, "Can not write mirrored data block "
1682 "to local disk.\n");
1683 spin_lock(&mdev->peer_seq_lock);
1684 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1685 mdev->peer_seq++;
1686 spin_unlock(&mdev->peer_seq_lock);
1687
1688 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1689 atomic_inc(&mdev->current_epoch->epoch_size);
1690 return drbd_drain_block(mdev, data_size);
1691 }
1692
1693 /* get_ldev(mdev) successful.
1694 * Corresponding put_ldev done either below (on various errors),
1695 * or in drbd_endio_write_sec, if we successfully submit the data at
1696 * the end of this function. */
1697
1698 sector = be64_to_cpu(p->sector);
1699 e = read_in_block(mdev, p->block_id, sector, data_size);
1700 if (!e) {
1701 put_ldev(mdev);
1702 return FALSE;
1703 }
1704
1705 e->private_bio->bi_end_io = drbd_endio_write_sec;
1706 e->w.cb = e_end_block;
1707
1708 spin_lock(&mdev->epoch_lock);
1709 e->epoch = mdev->current_epoch;
1710 atomic_inc(&e->epoch->epoch_size);
1711 atomic_inc(&e->epoch->active);
1712
1713 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1714 struct drbd_epoch *epoch;
1715 /* Issue a barrier if we start a new epoch, and the previous epoch
1716 was not a epoch containing a single request which already was
1717 a Barrier. */
1718 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1719 if (epoch == e->epoch) {
1720 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001721 rw |= (1<<BIO_RW_BARRIER);
1722 e->flags |= EE_IS_BARRIER;
1723 } else {
1724 if (atomic_read(&epoch->epoch_size) > 1 ||
1725 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1726 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001727 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001728 rw |= (1<<BIO_RW_BARRIER);
1729 e->flags |= EE_IS_BARRIER;
1730 }
1731 }
1732 }
1733 spin_unlock(&mdev->epoch_lock);
1734
1735 dp_flags = be32_to_cpu(p->dp_flags);
1736 if (dp_flags & DP_HARDBARRIER) {
1737 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1738 /* rw |= (1<<BIO_RW_BARRIER); */
1739 }
1740 if (dp_flags & DP_RW_SYNC)
1741 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1742 if (dp_flags & DP_MAY_SET_IN_SYNC)
1743 e->flags |= EE_MAY_SET_IN_SYNC;
1744
1745 /* I'm the receiver, I do hold a net_cnt reference. */
1746 if (!mdev->net_conf->two_primaries) {
1747 spin_lock_irq(&mdev->req_lock);
1748 } else {
1749 /* don't get the req_lock yet,
1750 * we may sleep in drbd_wait_peer_seq */
1751 const int size = e->size;
1752 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1753 DEFINE_WAIT(wait);
1754 struct drbd_request *i;
1755 struct hlist_node *n;
1756 struct hlist_head *slot;
1757 int first;
1758
1759 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1760 BUG_ON(mdev->ee_hash == NULL);
1761 BUG_ON(mdev->tl_hash == NULL);
1762
1763 /* conflict detection and handling:
1764 * 1. wait on the sequence number,
1765 * in case this data packet overtook ACK packets.
1766 * 2. check our hash tables for conflicting requests.
1767 * we only need to walk the tl_hash, since an ee can not
1768 * have a conflict with an other ee: on the submitting
1769 * node, the corresponding req had already been conflicting,
1770 * and a conflicting req is never sent.
1771 *
1772 * Note: for two_primaries, we are protocol C,
1773 * so there cannot be any request that is DONE
1774 * but still on the transfer log.
1775 *
1776 * unconditionally add to the ee_hash.
1777 *
1778 * if no conflicting request is found:
1779 * submit.
1780 *
1781 * if any conflicting request is found
1782 * that has not yet been acked,
1783 * AND I have the "discard concurrent writes" flag:
1784 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1785 *
1786 * if any conflicting request is found:
1787 * block the receiver, waiting on misc_wait
1788 * until no more conflicting requests are there,
1789 * or we get interrupted (disconnect).
1790 *
1791 * we do not just write after local io completion of those
1792 * requests, but only after req is done completely, i.e.
1793 * we wait for the P_DISCARD_ACK to arrive!
1794 *
1795 * then proceed normally, i.e. submit.
1796 */
1797 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1798 goto out_interrupted;
1799
1800 spin_lock_irq(&mdev->req_lock);
1801
1802 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1803
1804#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1805 slot = tl_hash_slot(mdev, sector);
1806 first = 1;
1807 for (;;) {
1808 int have_unacked = 0;
1809 int have_conflict = 0;
1810 prepare_to_wait(&mdev->misc_wait, &wait,
1811 TASK_INTERRUPTIBLE);
1812 hlist_for_each_entry(i, n, slot, colision) {
1813 if (OVERLAPS) {
1814 /* only ALERT on first iteration,
1815 * we may be woken up early... */
1816 if (first)
1817 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1818 " new: %llus +%u; pending: %llus +%u\n",
1819 current->comm, current->pid,
1820 (unsigned long long)sector, size,
1821 (unsigned long long)i->sector, i->size);
1822 if (i->rq_state & RQ_NET_PENDING)
1823 ++have_unacked;
1824 ++have_conflict;
1825 }
1826 }
1827#undef OVERLAPS
1828 if (!have_conflict)
1829 break;
1830
1831 /* Discard Ack only for the _first_ iteration */
1832 if (first && discard && have_unacked) {
1833 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1834 (unsigned long long)sector);
1835 inc_unacked(mdev);
1836 e->w.cb = e_send_discard_ack;
1837 list_add_tail(&e->w.list, &mdev->done_ee);
1838
1839 spin_unlock_irq(&mdev->req_lock);
1840
1841 /* we could probably send that P_DISCARD_ACK ourselves,
1842 * but I don't like the receiver using the msock */
1843
1844 put_ldev(mdev);
1845 wake_asender(mdev);
1846 finish_wait(&mdev->misc_wait, &wait);
1847 return TRUE;
1848 }
1849
1850 if (signal_pending(current)) {
1851 hlist_del_init(&e->colision);
1852
1853 spin_unlock_irq(&mdev->req_lock);
1854
1855 finish_wait(&mdev->misc_wait, &wait);
1856 goto out_interrupted;
1857 }
1858
1859 spin_unlock_irq(&mdev->req_lock);
1860 if (first) {
1861 first = 0;
1862 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1863 "sec=%llus\n", (unsigned long long)sector);
1864 } else if (discard) {
1865 /* we had none on the first iteration.
1866 * there must be none now. */
1867 D_ASSERT(have_unacked == 0);
1868 }
1869 schedule();
1870 spin_lock_irq(&mdev->req_lock);
1871 }
1872 finish_wait(&mdev->misc_wait, &wait);
1873 }
1874
1875 list_add(&e->w.list, &mdev->active_ee);
1876 spin_unlock_irq(&mdev->req_lock);
1877
1878 switch (mdev->net_conf->wire_protocol) {
1879 case DRBD_PROT_C:
1880 inc_unacked(mdev);
1881 /* corresponding dec_unacked() in e_end_block()
1882 * respective _drbd_clear_done_ee */
1883 break;
1884 case DRBD_PROT_B:
1885 /* I really don't like it that the receiver thread
1886 * sends on the msock, but anyways */
1887 drbd_send_ack(mdev, P_RECV_ACK, e);
1888 break;
1889 case DRBD_PROT_A:
1890 /* nothing to do */
1891 break;
1892 }
1893
1894 if (mdev->state.pdsk == D_DISKLESS) {
1895 /* In case we have the only disk of the cluster, */
1896 drbd_set_out_of_sync(mdev, e->sector, e->size);
1897 e->flags |= EE_CALL_AL_COMPLETE_IO;
1898 drbd_al_begin_io(mdev, e->sector);
1899 }
1900
1901 e->private_bio->bi_rw = rw;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001902 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1903 /* accounting done in endio */
1904
1905 maybe_kick_lo(mdev);
1906 return TRUE;
1907
1908out_interrupted:
1909 /* yes, the epoch_size now is imbalanced.
1910 * but we drop the connection anyways, so we don't have a chance to
1911 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1912 put_ldev(mdev);
1913 drbd_free_ee(mdev, e);
1914 return FALSE;
1915}
1916
1917static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1918{
1919 sector_t sector;
1920 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1921 struct drbd_epoch_entry *e;
1922 struct digest_info *di = NULL;
1923 int size, digest_size;
1924 unsigned int fault_type;
1925 struct p_block_req *p =
1926 (struct p_block_req *)h;
1927 const int brps = sizeof(*p)-sizeof(*h);
1928
1929 if (drbd_recv(mdev, h->payload, brps) != brps)
1930 return FALSE;
1931
1932 sector = be64_to_cpu(p->sector);
1933 size = be32_to_cpu(p->blksize);
1934
1935 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1936 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1937 (unsigned long long)sector, size);
1938 return FALSE;
1939 }
1940 if (sector + (size>>9) > capacity) {
1941 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1942 (unsigned long long)sector, size);
1943 return FALSE;
1944 }
1945
1946 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1947 if (__ratelimit(&drbd_ratelimit_state))
1948 dev_err(DEV, "Can not satisfy peer's read request, "
1949 "no local data.\n");
1950 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1951 P_NEG_RS_DREPLY , p);
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02001952 return drbd_drain_block(mdev, h->length - brps);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001953 }
1954
1955 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1956 * "criss-cross" setup, that might cause write-out on some other DRBD,
1957 * which in turn might block on the other node at this very place. */
1958 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1959 if (!e) {
1960 put_ldev(mdev);
1961 return FALSE;
1962 }
1963
1964 e->private_bio->bi_rw = READ;
1965 e->private_bio->bi_end_io = drbd_endio_read_sec;
1966
1967 switch (h->command) {
1968 case P_DATA_REQUEST:
1969 e->w.cb = w_e_end_data_req;
1970 fault_type = DRBD_FAULT_DT_RD;
1971 break;
1972 case P_RS_DATA_REQUEST:
1973 e->w.cb = w_e_end_rsdata_req;
1974 fault_type = DRBD_FAULT_RS_RD;
1975 /* Eventually this should become asynchronously. Currently it
1976 * blocks the whole receiver just to delay the reading of a
1977 * resync data block.
1978 * the drbd_work_queue mechanism is made for this...
1979 */
1980 if (!drbd_rs_begin_io(mdev, sector)) {
1981 /* we have been interrupted,
1982 * probably connection lost! */
1983 D_ASSERT(signal_pending(current));
1984 goto out_free_e;
1985 }
1986 break;
1987
1988 case P_OV_REPLY:
1989 case P_CSUM_RS_REQUEST:
1990 fault_type = DRBD_FAULT_RS_RD;
1991 digest_size = h->length - brps ;
1992 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
1993 if (!di)
1994 goto out_free_e;
1995
1996 di->digest_size = digest_size;
1997 di->digest = (((char *)di)+sizeof(struct digest_info));
1998
1999 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2000 goto out_free_e;
2001
2002 e->block_id = (u64)(unsigned long)di;
2003 if (h->command == P_CSUM_RS_REQUEST) {
2004 D_ASSERT(mdev->agreed_pro_version >= 89);
2005 e->w.cb = w_e_end_csum_rs_req;
2006 } else if (h->command == P_OV_REPLY) {
2007 e->w.cb = w_e_end_ov_reply;
2008 dec_rs_pending(mdev);
2009 break;
2010 }
2011
2012 if (!drbd_rs_begin_io(mdev, sector)) {
2013 /* we have been interrupted, probably connection lost! */
2014 D_ASSERT(signal_pending(current));
2015 goto out_free_e;
2016 }
2017 break;
2018
2019 case P_OV_REQUEST:
2020 if (mdev->state.conn >= C_CONNECTED &&
2021 mdev->state.conn != C_VERIFY_T)
2022 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2023 drbd_conn_str(mdev->state.conn));
2024 if (mdev->ov_start_sector == ~(sector_t)0 &&
2025 mdev->agreed_pro_version >= 90) {
2026 mdev->ov_start_sector = sector;
2027 mdev->ov_position = sector;
2028 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2029 dev_info(DEV, "Online Verify start sector: %llu\n",
2030 (unsigned long long)sector);
2031 }
2032 e->w.cb = w_e_end_ov_req;
2033 fault_type = DRBD_FAULT_RS_RD;
2034 /* Eventually this should become asynchronous. Currently it
2035 * blocks the whole receiver just to delay the reading of a
2036 * resync data block.
2037 * the drbd_work_queue mechanism is made for this...
2038 */
2039 if (!drbd_rs_begin_io(mdev, sector)) {
2040 /* we have been interrupted,
2041 * probably connection lost! */
2042 D_ASSERT(signal_pending(current));
2043 goto out_free_e;
2044 }
2045 break;
2046
2047
2048 default:
2049 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2050 cmdname(h->command));
2051 fault_type = DRBD_FAULT_MAX;
2052 }
2053
2054 spin_lock_irq(&mdev->req_lock);
2055 list_add(&e->w.list, &mdev->read_ee);
2056 spin_unlock_irq(&mdev->req_lock);
2057
2058 inc_unacked(mdev);
2059
Philipp Reisnerb411b362009-09-25 16:07:19 -07002060 drbd_generic_make_request(mdev, fault_type, e->private_bio);
2061 maybe_kick_lo(mdev);
2062
2063 return TRUE;
2064
2065out_free_e:
2066 kfree(di);
2067 put_ldev(mdev);
2068 drbd_free_ee(mdev, e);
2069 return FALSE;
2070}
2071
2072static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2073{
2074 int self, peer, rv = -100;
2075 unsigned long ch_self, ch_peer;
2076
2077 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2078 peer = mdev->p_uuid[UI_BITMAP] & 1;
2079
2080 ch_peer = mdev->p_uuid[UI_SIZE];
2081 ch_self = mdev->comm_bm_set;
2082
2083 switch (mdev->net_conf->after_sb_0p) {
2084 case ASB_CONSENSUS:
2085 case ASB_DISCARD_SECONDARY:
2086 case ASB_CALL_HELPER:
2087 dev_err(DEV, "Configuration error.\n");
2088 break;
2089 case ASB_DISCONNECT:
2090 break;
2091 case ASB_DISCARD_YOUNGER_PRI:
2092 if (self == 0 && peer == 1) {
2093 rv = -1;
2094 break;
2095 }
2096 if (self == 1 && peer == 0) {
2097 rv = 1;
2098 break;
2099 }
2100 /* Else fall through to one of the other strategies... */
2101 case ASB_DISCARD_OLDER_PRI:
2102 if (self == 0 && peer == 1) {
2103 rv = 1;
2104 break;
2105 }
2106 if (self == 1 && peer == 0) {
2107 rv = -1;
2108 break;
2109 }
2110 /* Else fall through to one of the other strategies... */
Lars Ellenbergad19bf62009-10-14 09:36:49 +02002111 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
Philipp Reisnerb411b362009-09-25 16:07:19 -07002112 "Using discard-least-changes instead\n");
2113 case ASB_DISCARD_ZERO_CHG:
2114 if (ch_peer == 0 && ch_self == 0) {
2115 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2116 ? -1 : 1;
2117 break;
2118 } else {
2119 if (ch_peer == 0) { rv = 1; break; }
2120 if (ch_self == 0) { rv = -1; break; }
2121 }
2122 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2123 break;
2124 case ASB_DISCARD_LEAST_CHG:
2125 if (ch_self < ch_peer)
2126 rv = -1;
2127 else if (ch_self > ch_peer)
2128 rv = 1;
2129 else /* ( ch_self == ch_peer ) */
2130 /* Well, then use something else. */
2131 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2132 ? -1 : 1;
2133 break;
2134 case ASB_DISCARD_LOCAL:
2135 rv = -1;
2136 break;
2137 case ASB_DISCARD_REMOTE:
2138 rv = 1;
2139 }
2140
2141 return rv;
2142}
2143
2144static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2145{
2146 int self, peer, hg, rv = -100;
2147
2148 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2149 peer = mdev->p_uuid[UI_BITMAP] & 1;
2150
2151 switch (mdev->net_conf->after_sb_1p) {
2152 case ASB_DISCARD_YOUNGER_PRI:
2153 case ASB_DISCARD_OLDER_PRI:
2154 case ASB_DISCARD_LEAST_CHG:
2155 case ASB_DISCARD_LOCAL:
2156 case ASB_DISCARD_REMOTE:
2157 dev_err(DEV, "Configuration error.\n");
2158 break;
2159 case ASB_DISCONNECT:
2160 break;
2161 case ASB_CONSENSUS:
2162 hg = drbd_asb_recover_0p(mdev);
2163 if (hg == -1 && mdev->state.role == R_SECONDARY)
2164 rv = hg;
2165 if (hg == 1 && mdev->state.role == R_PRIMARY)
2166 rv = hg;
2167 break;
2168 case ASB_VIOLENTLY:
2169 rv = drbd_asb_recover_0p(mdev);
2170 break;
2171 case ASB_DISCARD_SECONDARY:
2172 return mdev->state.role == R_PRIMARY ? 1 : -1;
2173 case ASB_CALL_HELPER:
2174 hg = drbd_asb_recover_0p(mdev);
2175 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2176 self = drbd_set_role(mdev, R_SECONDARY, 0);
2177 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2178 * we might be here in C_WF_REPORT_PARAMS which is transient.
2179 * we do not need to wait for the after state change work either. */
2180 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2181 if (self != SS_SUCCESS) {
2182 drbd_khelper(mdev, "pri-lost-after-sb");
2183 } else {
2184 dev_warn(DEV, "Successfully gave up primary role.\n");
2185 rv = hg;
2186 }
2187 } else
2188 rv = hg;
2189 }
2190
2191 return rv;
2192}
2193
2194static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2195{
2196 int self, peer, hg, rv = -100;
2197
2198 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2199 peer = mdev->p_uuid[UI_BITMAP] & 1;
2200
2201 switch (mdev->net_conf->after_sb_2p) {
2202 case ASB_DISCARD_YOUNGER_PRI:
2203 case ASB_DISCARD_OLDER_PRI:
2204 case ASB_DISCARD_LEAST_CHG:
2205 case ASB_DISCARD_LOCAL:
2206 case ASB_DISCARD_REMOTE:
2207 case ASB_CONSENSUS:
2208 case ASB_DISCARD_SECONDARY:
2209 dev_err(DEV, "Configuration error.\n");
2210 break;
2211 case ASB_VIOLENTLY:
2212 rv = drbd_asb_recover_0p(mdev);
2213 break;
2214 case ASB_DISCONNECT:
2215 break;
2216 case ASB_CALL_HELPER:
2217 hg = drbd_asb_recover_0p(mdev);
2218 if (hg == -1) {
2219 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2220 * we might be here in C_WF_REPORT_PARAMS which is transient.
2221 * we do not need to wait for the after state change work either. */
2222 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2223 if (self != SS_SUCCESS) {
2224 drbd_khelper(mdev, "pri-lost-after-sb");
2225 } else {
2226 dev_warn(DEV, "Successfully gave up primary role.\n");
2227 rv = hg;
2228 }
2229 } else
2230 rv = hg;
2231 }
2232
2233 return rv;
2234}
2235
2236static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2237 u64 bits, u64 flags)
2238{
2239 if (!uuid) {
2240 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2241 return;
2242 }
2243 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2244 text,
2245 (unsigned long long)uuid[UI_CURRENT],
2246 (unsigned long long)uuid[UI_BITMAP],
2247 (unsigned long long)uuid[UI_HISTORY_START],
2248 (unsigned long long)uuid[UI_HISTORY_END],
2249 (unsigned long long)bits,
2250 (unsigned long long)flags);
2251}
2252
2253/*
2254 100 after split brain try auto recover
2255 2 C_SYNC_SOURCE set BitMap
2256 1 C_SYNC_SOURCE use BitMap
2257 0 no Sync
2258 -1 C_SYNC_TARGET use BitMap
2259 -2 C_SYNC_TARGET set BitMap
2260 -100 after split brain, disconnect
2261-1000 unrelated data
2262 */
2263static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2264{
2265 u64 self, peer;
2266 int i, j;
2267
2268 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2269 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2270
2271 *rule_nr = 10;
2272 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2273 return 0;
2274
2275 *rule_nr = 20;
2276 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2277 peer != UUID_JUST_CREATED)
2278 return -2;
2279
2280 *rule_nr = 30;
2281 if (self != UUID_JUST_CREATED &&
2282 (peer == UUID_JUST_CREATED || peer == (u64)0))
2283 return 2;
2284
2285 if (self == peer) {
2286 int rct, dc; /* roles at crash time */
2287
2288 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2289
2290 if (mdev->agreed_pro_version < 91)
2291 return -1001;
2292
2293 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2294 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2295 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2296 drbd_uuid_set_bm(mdev, 0UL);
2297
2298 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2299 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2300 *rule_nr = 34;
2301 } else {
2302 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2303 *rule_nr = 36;
2304 }
2305
2306 return 1;
2307 }
2308
2309 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2310
2311 if (mdev->agreed_pro_version < 91)
2312 return -1001;
2313
2314 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2315 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2316 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2317
2318 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2319 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2320 mdev->p_uuid[UI_BITMAP] = 0UL;
2321
2322 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2323 *rule_nr = 35;
2324 } else {
2325 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2326 *rule_nr = 37;
2327 }
2328
2329 return -1;
2330 }
2331
2332 /* Common power [off|failure] */
2333 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2334 (mdev->p_uuid[UI_FLAGS] & 2);
2335 /* lowest bit is set when we were primary,
2336 * next bit (weight 2) is set when peer was primary */
2337 *rule_nr = 40;
2338
2339 switch (rct) {
2340 case 0: /* !self_pri && !peer_pri */ return 0;
2341 case 1: /* self_pri && !peer_pri */ return 1;
2342 case 2: /* !self_pri && peer_pri */ return -1;
2343 case 3: /* self_pri && peer_pri */
2344 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2345 return dc ? -1 : 1;
2346 }
2347 }
2348
2349 *rule_nr = 50;
2350 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2351 if (self == peer)
2352 return -1;
2353
2354 *rule_nr = 51;
2355 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2356 if (self == peer) {
2357 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2358 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2359 if (self == peer) {
2360 /* The last P_SYNC_UUID did not get though. Undo the last start of
2361 resync as sync source modifications of the peer's UUIDs. */
2362
2363 if (mdev->agreed_pro_version < 91)
2364 return -1001;
2365
2366 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2367 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2368 return -1;
2369 }
2370 }
2371
2372 *rule_nr = 60;
2373 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2374 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2375 peer = mdev->p_uuid[i] & ~((u64)1);
2376 if (self == peer)
2377 return -2;
2378 }
2379
2380 *rule_nr = 70;
2381 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2382 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2383 if (self == peer)
2384 return 1;
2385
2386 *rule_nr = 71;
2387 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2388 if (self == peer) {
2389 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2390 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2391 if (self == peer) {
2392 /* The last P_SYNC_UUID did not get though. Undo the last start of
2393 resync as sync source modifications of our UUIDs. */
2394
2395 if (mdev->agreed_pro_version < 91)
2396 return -1001;
2397
2398 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2399 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2400
2401 dev_info(DEV, "Undid last start of resync:\n");
2402
2403 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2404 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2405
2406 return 1;
2407 }
2408 }
2409
2410
2411 *rule_nr = 80;
Philipp Reisnerd8c2a362009-11-18 15:52:51 +01002412 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002413 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2414 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2415 if (self == peer)
2416 return 2;
2417 }
2418
2419 *rule_nr = 90;
2420 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2421 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2422 if (self == peer && self != ((u64)0))
2423 return 100;
2424
2425 *rule_nr = 100;
2426 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2427 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2428 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2429 peer = mdev->p_uuid[j] & ~((u64)1);
2430 if (self == peer)
2431 return -100;
2432 }
2433 }
2434
2435 return -1000;
2436}
2437
2438/* drbd_sync_handshake() returns the new conn state on success, or
2439 CONN_MASK (-1) on failure.
2440 */
2441static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2442 enum drbd_disk_state peer_disk) __must_hold(local)
2443{
2444 int hg, rule_nr;
2445 enum drbd_conns rv = C_MASK;
2446 enum drbd_disk_state mydisk;
2447
2448 mydisk = mdev->state.disk;
2449 if (mydisk == D_NEGOTIATING)
2450 mydisk = mdev->new_state_tmp.disk;
2451
2452 dev_info(DEV, "drbd_sync_handshake:\n");
2453 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2454 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2455 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2456
2457 hg = drbd_uuid_compare(mdev, &rule_nr);
2458
2459 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2460
2461 if (hg == -1000) {
2462 dev_alert(DEV, "Unrelated data, aborting!\n");
2463 return C_MASK;
2464 }
2465 if (hg == -1001) {
2466 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2467 return C_MASK;
2468 }
2469
2470 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2471 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2472 int f = (hg == -100) || abs(hg) == 2;
2473 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2474 if (f)
2475 hg = hg*2;
2476 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2477 hg > 0 ? "source" : "target");
2478 }
2479
2480 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2481 int pcount = (mdev->state.role == R_PRIMARY)
2482 + (peer_role == R_PRIMARY);
2483 int forced = (hg == -100);
2484
2485 switch (pcount) {
2486 case 0:
2487 hg = drbd_asb_recover_0p(mdev);
2488 break;
2489 case 1:
2490 hg = drbd_asb_recover_1p(mdev);
2491 break;
2492 case 2:
2493 hg = drbd_asb_recover_2p(mdev);
2494 break;
2495 }
2496 if (abs(hg) < 100) {
2497 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2498 "automatically solved. Sync from %s node\n",
2499 pcount, (hg < 0) ? "peer" : "this");
2500 if (forced) {
2501 dev_warn(DEV, "Doing a full sync, since"
2502 " UUIDs where ambiguous.\n");
2503 hg = hg*2;
2504 }
2505 }
2506 }
2507
2508 if (hg == -100) {
2509 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2510 hg = -1;
2511 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2512 hg = 1;
2513
2514 if (abs(hg) < 100)
2515 dev_warn(DEV, "Split-Brain detected, manually solved. "
2516 "Sync from %s node\n",
2517 (hg < 0) ? "peer" : "this");
2518 }
2519
2520 if (hg == -100) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01002521 /* FIXME this log message is not correct if we end up here
2522 * after an attempted attach on a diskless node.
2523 * We just refuse to attach -- well, we drop the "connection"
2524 * to that disk, in a way... */
Philipp Reisnerb411b362009-09-25 16:07:19 -07002525 dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2526 drbd_khelper(mdev, "split-brain");
2527 return C_MASK;
2528 }
2529
2530 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2531 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2532 return C_MASK;
2533 }
2534
2535 if (hg < 0 && /* by intention we do not use mydisk here. */
2536 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2537 switch (mdev->net_conf->rr_conflict) {
2538 case ASB_CALL_HELPER:
2539 drbd_khelper(mdev, "pri-lost");
2540 /* fall through */
2541 case ASB_DISCONNECT:
2542 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2543 return C_MASK;
2544 case ASB_VIOLENTLY:
2545 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2546 "assumption\n");
2547 }
2548 }
2549
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002550 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2551 if (hg == 0)
2552 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2553 else
2554 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2555 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2556 abs(hg) >= 2 ? "full" : "bit-map based");
2557 return C_MASK;
2558 }
2559
Philipp Reisnerb411b362009-09-25 16:07:19 -07002560 if (abs(hg) >= 2) {
2561 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2562 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2563 return C_MASK;
2564 }
2565
2566 if (hg > 0) { /* become sync source. */
2567 rv = C_WF_BITMAP_S;
2568 } else if (hg < 0) { /* become sync target */
2569 rv = C_WF_BITMAP_T;
2570 } else {
2571 rv = C_CONNECTED;
2572 if (drbd_bm_total_weight(mdev)) {
2573 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2574 drbd_bm_total_weight(mdev));
2575 }
2576 }
2577
2578 return rv;
2579}
2580
2581/* returns 1 if invalid */
2582static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2583{
2584 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2585 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2586 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2587 return 0;
2588
2589 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2590 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2591 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2592 return 1;
2593
2594 /* everything else is valid if they are equal on both sides. */
2595 if (peer == self)
2596 return 0;
2597
2598 /* everything es is invalid. */
2599 return 1;
2600}
2601
2602static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2603{
2604 struct p_protocol *p = (struct p_protocol *)h;
2605 int header_size, data_size;
2606 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002607 int p_want_lose, p_two_primaries, cf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002608 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2609
2610 header_size = sizeof(*p) - sizeof(*h);
2611 data_size = h->length - header_size;
2612
2613 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2614 return FALSE;
2615
2616 p_proto = be32_to_cpu(p->protocol);
2617 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2618 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2619 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002620 p_two_primaries = be32_to_cpu(p->two_primaries);
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002621 cf = be32_to_cpu(p->conn_flags);
2622 p_want_lose = cf & CF_WANT_LOSE;
2623
2624 clear_bit(CONN_DRY_RUN, &mdev->flags);
2625
2626 if (cf & CF_DRY_RUN)
2627 set_bit(CONN_DRY_RUN, &mdev->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002628
2629 if (p_proto != mdev->net_conf->wire_protocol) {
2630 dev_err(DEV, "incompatible communication protocols\n");
2631 goto disconnect;
2632 }
2633
2634 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2635 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2636 goto disconnect;
2637 }
2638
2639 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2640 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2641 goto disconnect;
2642 }
2643
2644 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2645 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2646 goto disconnect;
2647 }
2648
2649 if (p_want_lose && mdev->net_conf->want_lose) {
2650 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2651 goto disconnect;
2652 }
2653
2654 if (p_two_primaries != mdev->net_conf->two_primaries) {
2655 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2656 goto disconnect;
2657 }
2658
2659 if (mdev->agreed_pro_version >= 87) {
2660 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2661
2662 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2663 return FALSE;
2664
2665 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2666 if (strcmp(p_integrity_alg, my_alg)) {
2667 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2668 goto disconnect;
2669 }
2670 dev_info(DEV, "data-integrity-alg: %s\n",
2671 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2672 }
2673
2674 return TRUE;
2675
2676disconnect:
2677 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2678 return FALSE;
2679}
2680
2681/* helper function
2682 * input: alg name, feature name
2683 * return: NULL (alg name was "")
2684 * ERR_PTR(error) if something goes wrong
2685 * or the crypto hash ptr, if it worked out ok. */
2686struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2687 const char *alg, const char *name)
2688{
2689 struct crypto_hash *tfm;
2690
2691 if (!alg[0])
2692 return NULL;
2693
2694 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2695 if (IS_ERR(tfm)) {
2696 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2697 alg, name, PTR_ERR(tfm));
2698 return tfm;
2699 }
2700 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2701 crypto_free_hash(tfm);
2702 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2703 return ERR_PTR(-EINVAL);
2704 }
2705 return tfm;
2706}
2707
2708static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2709{
2710 int ok = TRUE;
2711 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2712 unsigned int header_size, data_size, exp_max_sz;
2713 struct crypto_hash *verify_tfm = NULL;
2714 struct crypto_hash *csums_tfm = NULL;
2715 const int apv = mdev->agreed_pro_version;
2716
2717 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2718 : apv == 88 ? sizeof(struct p_rs_param)
2719 + SHARED_SECRET_MAX
2720 : /* 89 */ sizeof(struct p_rs_param_89);
2721
2722 if (h->length > exp_max_sz) {
2723 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2724 h->length, exp_max_sz);
2725 return FALSE;
2726 }
2727
2728 if (apv <= 88) {
2729 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2730 data_size = h->length - header_size;
2731 } else /* apv >= 89 */ {
2732 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2733 data_size = h->length - header_size;
2734 D_ASSERT(data_size == 0);
2735 }
2736
2737 /* initialize verify_alg and csums_alg */
2738 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2739
2740 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2741 return FALSE;
2742
2743 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2744
2745 if (apv >= 88) {
2746 if (apv == 88) {
2747 if (data_size > SHARED_SECRET_MAX) {
2748 dev_err(DEV, "verify-alg too long, "
2749 "peer wants %u, accepting only %u byte\n",
2750 data_size, SHARED_SECRET_MAX);
2751 return FALSE;
2752 }
2753
2754 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2755 return FALSE;
2756
2757 /* we expect NUL terminated string */
2758 /* but just in case someone tries to be evil */
2759 D_ASSERT(p->verify_alg[data_size-1] == 0);
2760 p->verify_alg[data_size-1] = 0;
2761
2762 } else /* apv >= 89 */ {
2763 /* we still expect NUL terminated strings */
2764 /* but just in case someone tries to be evil */
2765 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2766 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2767 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2768 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2769 }
2770
2771 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2772 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2773 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2774 mdev->sync_conf.verify_alg, p->verify_alg);
2775 goto disconnect;
2776 }
2777 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2778 p->verify_alg, "verify-alg");
2779 if (IS_ERR(verify_tfm)) {
2780 verify_tfm = NULL;
2781 goto disconnect;
2782 }
2783 }
2784
2785 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2786 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2787 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2788 mdev->sync_conf.csums_alg, p->csums_alg);
2789 goto disconnect;
2790 }
2791 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2792 p->csums_alg, "csums-alg");
2793 if (IS_ERR(csums_tfm)) {
2794 csums_tfm = NULL;
2795 goto disconnect;
2796 }
2797 }
2798
2799
2800 spin_lock(&mdev->peer_seq_lock);
2801 /* lock against drbd_nl_syncer_conf() */
2802 if (verify_tfm) {
2803 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2804 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2805 crypto_free_hash(mdev->verify_tfm);
2806 mdev->verify_tfm = verify_tfm;
2807 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2808 }
2809 if (csums_tfm) {
2810 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2811 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2812 crypto_free_hash(mdev->csums_tfm);
2813 mdev->csums_tfm = csums_tfm;
2814 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2815 }
2816 spin_unlock(&mdev->peer_seq_lock);
2817 }
2818
2819 return ok;
2820disconnect:
2821 /* just for completeness: actually not needed,
2822 * as this is not reached if csums_tfm was ok. */
2823 crypto_free_hash(csums_tfm);
2824 /* but free the verify_tfm again, if csums_tfm did not work out */
2825 crypto_free_hash(verify_tfm);
2826 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2827 return FALSE;
2828}
2829
2830static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2831{
2832 /* sorry, we currently have no working implementation
2833 * of distributed TCQ */
2834}
2835
2836/* warn if the arguments differ by more than 12.5% */
2837static void warn_if_differ_considerably(struct drbd_conf *mdev,
2838 const char *s, sector_t a, sector_t b)
2839{
2840 sector_t d;
2841 if (a == 0 || b == 0)
2842 return;
2843 d = (a > b) ? (a - b) : (b - a);
2844 if (d > (a>>3) || d > (b>>3))
2845 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2846 (unsigned long long)a, (unsigned long long)b);
2847}
2848
2849static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2850{
2851 struct p_sizes *p = (struct p_sizes *)h;
2852 enum determine_dev_size dd = unchanged;
2853 unsigned int max_seg_s;
2854 sector_t p_size, p_usize, my_usize;
2855 int ldsc = 0; /* local disk size changed */
2856 enum drbd_conns nconn;
2857
2858 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2859 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2860 return FALSE;
2861
2862 p_size = be64_to_cpu(p->d_size);
2863 p_usize = be64_to_cpu(p->u_size);
2864
2865 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2866 dev_err(DEV, "some backing storage is needed\n");
2867 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2868 return FALSE;
2869 }
2870
2871 /* just store the peer's disk size for now.
2872 * we still need to figure out whether we accept that. */
2873 mdev->p_size = p_size;
2874
2875#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2876 if (get_ldev(mdev)) {
2877 warn_if_differ_considerably(mdev, "lower level device sizes",
2878 p_size, drbd_get_max_capacity(mdev->ldev));
2879 warn_if_differ_considerably(mdev, "user requested size",
2880 p_usize, mdev->ldev->dc.disk_size);
2881
2882 /* if this is the first connect, or an otherwise expected
2883 * param exchange, choose the minimum */
2884 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2885 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2886 p_usize);
2887
2888 my_usize = mdev->ldev->dc.disk_size;
2889
2890 if (mdev->ldev->dc.disk_size != p_usize) {
2891 mdev->ldev->dc.disk_size = p_usize;
2892 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2893 (unsigned long)mdev->ldev->dc.disk_size);
2894 }
2895
2896 /* Never shrink a device with usable data during connect.
2897 But allow online shrinking if we are connected. */
Philipp Reisnera393db62009-12-22 13:35:52 +01002898 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
Philipp Reisnerb411b362009-09-25 16:07:19 -07002899 drbd_get_capacity(mdev->this_bdev) &&
2900 mdev->state.disk >= D_OUTDATED &&
2901 mdev->state.conn < C_CONNECTED) {
2902 dev_err(DEV, "The peer's disk size is too small!\n");
2903 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2904 mdev->ldev->dc.disk_size = my_usize;
2905 put_ldev(mdev);
2906 return FALSE;
2907 }
2908 put_ldev(mdev);
2909 }
2910#undef min_not_zero
2911
2912 if (get_ldev(mdev)) {
Philipp Reisnera393db62009-12-22 13:35:52 +01002913 dd = drbd_determin_dev_size(mdev, 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002914 put_ldev(mdev);
2915 if (dd == dev_size_error)
2916 return FALSE;
2917 drbd_md_sync(mdev);
2918 } else {
2919 /* I am diskless, need to accept the peer's size. */
2920 drbd_set_my_capacity(mdev, p_size);
2921 }
2922
2923 if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2924 nconn = drbd_sync_handshake(mdev,
2925 mdev->state.peer, mdev->state.pdsk);
2926 put_ldev(mdev);
2927
2928 if (nconn == C_MASK) {
2929 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2930 return FALSE;
2931 }
2932
2933 if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2934 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2935 return FALSE;
2936 }
2937 }
2938
2939 if (get_ldev(mdev)) {
2940 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2941 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2942 ldsc = 1;
2943 }
2944
2945 max_seg_s = be32_to_cpu(p->max_segment_size);
2946 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2947 drbd_setup_queue_param(mdev, max_seg_s);
2948
2949 drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2950 put_ldev(mdev);
2951 }
2952
2953 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2954 if (be64_to_cpu(p->c_size) !=
2955 drbd_get_capacity(mdev->this_bdev) || ldsc) {
2956 /* we have different sizes, probably peer
2957 * needs to know my new size... */
2958 drbd_send_sizes(mdev, 0);
2959 }
2960 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2961 (dd == grew && mdev->state.conn == C_CONNECTED)) {
2962 if (mdev->state.pdsk >= D_INCONSISTENT &&
2963 mdev->state.disk >= D_INCONSISTENT)
2964 resync_after_online_grow(mdev);
2965 else
2966 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2967 }
2968 }
2969
2970 return TRUE;
2971}
2972
2973static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2974{
2975 struct p_uuids *p = (struct p_uuids *)h;
2976 u64 *p_uuid;
2977 int i;
2978
2979 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2980 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2981 return FALSE;
2982
2983 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2984
2985 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2986 p_uuid[i] = be64_to_cpu(p->uuid[i]);
2987
2988 kfree(mdev->p_uuid);
2989 mdev->p_uuid = p_uuid;
2990
2991 if (mdev->state.conn < C_CONNECTED &&
2992 mdev->state.disk < D_INCONSISTENT &&
2993 mdev->state.role == R_PRIMARY &&
2994 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2995 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2996 (unsigned long long)mdev->ed_uuid);
2997 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2998 return FALSE;
2999 }
3000
3001 if (get_ldev(mdev)) {
3002 int skip_initial_sync =
3003 mdev->state.conn == C_CONNECTED &&
3004 mdev->agreed_pro_version >= 90 &&
3005 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3006 (p_uuid[UI_FLAGS] & 8);
3007 if (skip_initial_sync) {
3008 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3009 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3010 "clear_n_write from receive_uuids");
3011 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3012 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3013 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3014 CS_VERBOSE, NULL);
3015 drbd_md_sync(mdev);
3016 }
3017 put_ldev(mdev);
3018 }
3019
3020 /* Before we test for the disk state, we should wait until an eventually
3021 ongoing cluster wide state change is finished. That is important if
3022 we are primary and are detaching from our disk. We need to see the
3023 new disk state... */
3024 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3025 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3026 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3027
3028 return TRUE;
3029}
3030
3031/**
3032 * convert_state() - Converts the peer's view of the cluster state to our point of view
3033 * @ps: The state as seen by the peer.
3034 */
3035static union drbd_state convert_state(union drbd_state ps)
3036{
3037 union drbd_state ms;
3038
3039 static enum drbd_conns c_tab[] = {
3040 [C_CONNECTED] = C_CONNECTED,
3041
3042 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3043 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3044 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3045 [C_VERIFY_S] = C_VERIFY_T,
3046 [C_MASK] = C_MASK,
3047 };
3048
3049 ms.i = ps.i;
3050
3051 ms.conn = c_tab[ps.conn];
3052 ms.peer = ps.role;
3053 ms.role = ps.peer;
3054 ms.pdsk = ps.disk;
3055 ms.disk = ps.pdsk;
3056 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3057
3058 return ms;
3059}
3060
3061static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3062{
3063 struct p_req_state *p = (struct p_req_state *)h;
3064 union drbd_state mask, val;
3065 int rv;
3066
3067 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3068 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3069 return FALSE;
3070
3071 mask.i = be32_to_cpu(p->mask);
3072 val.i = be32_to_cpu(p->val);
3073
3074 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3075 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3076 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3077 return TRUE;
3078 }
3079
3080 mask = convert_state(mask);
3081 val = convert_state(val);
3082
3083 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3084
3085 drbd_send_sr_reply(mdev, rv);
3086 drbd_md_sync(mdev);
3087
3088 return TRUE;
3089}
3090
3091static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3092{
3093 struct p_state *p = (struct p_state *)h;
3094 enum drbd_conns nconn, oconn;
3095 union drbd_state ns, peer_state;
3096 enum drbd_disk_state real_peer_disk;
3097 int rv;
3098
3099 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3100 return FALSE;
3101
3102 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3103 return FALSE;
3104
3105 peer_state.i = be32_to_cpu(p->state);
3106
3107 real_peer_disk = peer_state.disk;
3108 if (peer_state.disk == D_NEGOTIATING) {
3109 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3110 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3111 }
3112
3113 spin_lock_irq(&mdev->req_lock);
3114 retry:
3115 oconn = nconn = mdev->state.conn;
3116 spin_unlock_irq(&mdev->req_lock);
3117
3118 if (nconn == C_WF_REPORT_PARAMS)
3119 nconn = C_CONNECTED;
3120
3121 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3122 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3123 int cr; /* consider resync */
3124
3125 /* if we established a new connection */
3126 cr = (oconn < C_CONNECTED);
3127 /* if we had an established connection
3128 * and one of the nodes newly attaches a disk */
3129 cr |= (oconn == C_CONNECTED &&
3130 (peer_state.disk == D_NEGOTIATING ||
3131 mdev->state.disk == D_NEGOTIATING));
3132 /* if we have both been inconsistent, and the peer has been
3133 * forced to be UpToDate with --overwrite-data */
3134 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3135 /* if we had been plain connected, and the admin requested to
3136 * start a sync by "invalidate" or "invalidate-remote" */
3137 cr |= (oconn == C_CONNECTED &&
3138 (peer_state.conn >= C_STARTING_SYNC_S &&
3139 peer_state.conn <= C_WF_BITMAP_T));
3140
3141 if (cr)
3142 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3143
3144 put_ldev(mdev);
3145 if (nconn == C_MASK) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01003146 nconn = C_CONNECTED;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003147 if (mdev->state.disk == D_NEGOTIATING) {
3148 drbd_force_state(mdev, NS(disk, D_DISKLESS));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003149 } else if (peer_state.disk == D_NEGOTIATING) {
3150 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3151 peer_state.disk = D_DISKLESS;
Lars Ellenberg580b9762010-02-26 23:15:23 +01003152 real_peer_disk = D_DISKLESS;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003153 } else {
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01003154 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3155 return FALSE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003156 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3157 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3158 return FALSE;
3159 }
3160 }
3161 }
3162
3163 spin_lock_irq(&mdev->req_lock);
3164 if (mdev->state.conn != oconn)
3165 goto retry;
3166 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3167 ns.i = mdev->state.i;
3168 ns.conn = nconn;
3169 ns.peer = peer_state.role;
3170 ns.pdsk = real_peer_disk;
3171 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3172 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3173 ns.disk = mdev->new_state_tmp.disk;
3174
3175 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3176 ns = mdev->state;
3177 spin_unlock_irq(&mdev->req_lock);
3178
3179 if (rv < SS_SUCCESS) {
3180 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3181 return FALSE;
3182 }
3183
3184 if (oconn > C_WF_REPORT_PARAMS) {
3185 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3186 peer_state.disk != D_NEGOTIATING ) {
3187 /* we want resync, peer has not yet decided to sync... */
3188 /* Nowadays only used when forcing a node into primary role and
3189 setting its disk to UpToDate with that */
3190 drbd_send_uuids(mdev);
3191 drbd_send_state(mdev);
3192 }
3193 }
3194
3195 mdev->net_conf->want_lose = 0;
3196
3197 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3198
3199 return TRUE;
3200}
3201
3202static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3203{
3204 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3205
3206 wait_event(mdev->misc_wait,
3207 mdev->state.conn == C_WF_SYNC_UUID ||
3208 mdev->state.conn < C_CONNECTED ||
3209 mdev->state.disk < D_NEGOTIATING);
3210
3211 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3212
3213 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3214 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3215 return FALSE;
3216
3217 /* Here the _drbd_uuid_ functions are right, current should
3218 _not_ be rotated into the history */
3219 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3220 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3221 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3222
3223 drbd_start_resync(mdev, C_SYNC_TARGET);
3224
3225 put_ldev(mdev);
3226 } else
3227 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3228
3229 return TRUE;
3230}
3231
3232enum receive_bitmap_ret { OK, DONE, FAILED };
3233
3234static enum receive_bitmap_ret
3235receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3236 unsigned long *buffer, struct bm_xfer_ctx *c)
3237{
3238 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3239 unsigned want = num_words * sizeof(long);
3240
3241 if (want != h->length) {
3242 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3243 return FAILED;
3244 }
3245 if (want == 0)
3246 return DONE;
3247 if (drbd_recv(mdev, buffer, want) != want)
3248 return FAILED;
3249
3250 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3251
3252 c->word_offset += num_words;
3253 c->bit_offset = c->word_offset * BITS_PER_LONG;
3254 if (c->bit_offset > c->bm_bits)
3255 c->bit_offset = c->bm_bits;
3256
3257 return OK;
3258}
3259
3260static enum receive_bitmap_ret
3261recv_bm_rle_bits(struct drbd_conf *mdev,
3262 struct p_compressed_bm *p,
3263 struct bm_xfer_ctx *c)
3264{
3265 struct bitstream bs;
3266 u64 look_ahead;
3267 u64 rl;
3268 u64 tmp;
3269 unsigned long s = c->bit_offset;
3270 unsigned long e;
3271 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3272 int toggle = DCBP_get_start(p);
3273 int have;
3274 int bits;
3275
3276 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3277
3278 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3279 if (bits < 0)
3280 return FAILED;
3281
3282 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3283 bits = vli_decode_bits(&rl, look_ahead);
3284 if (bits <= 0)
3285 return FAILED;
3286
3287 if (toggle) {
3288 e = s + rl -1;
3289 if (e >= c->bm_bits) {
3290 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3291 return FAILED;
3292 }
3293 _drbd_bm_set_bits(mdev, s, e);
3294 }
3295
3296 if (have < bits) {
3297 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3298 have, bits, look_ahead,
3299 (unsigned int)(bs.cur.b - p->code),
3300 (unsigned int)bs.buf_len);
3301 return FAILED;
3302 }
3303 look_ahead >>= bits;
3304 have -= bits;
3305
3306 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3307 if (bits < 0)
3308 return FAILED;
3309 look_ahead |= tmp << have;
3310 have += bits;
3311 }
3312
3313 c->bit_offset = s;
3314 bm_xfer_ctx_bit_to_word_offset(c);
3315
3316 return (s == c->bm_bits) ? DONE : OK;
3317}
3318
3319static enum receive_bitmap_ret
3320decode_bitmap_c(struct drbd_conf *mdev,
3321 struct p_compressed_bm *p,
3322 struct bm_xfer_ctx *c)
3323{
3324 if (DCBP_get_code(p) == RLE_VLI_Bits)
3325 return recv_bm_rle_bits(mdev, p, c);
3326
3327 /* other variants had been implemented for evaluation,
3328 * but have been dropped as this one turned out to be "best"
3329 * during all our tests. */
3330
3331 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3332 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3333 return FAILED;
3334}
3335
3336void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3337 const char *direction, struct bm_xfer_ctx *c)
3338{
3339 /* what would it take to transfer it "plaintext" */
3340 unsigned plain = sizeof(struct p_header) *
3341 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3342 + c->bm_words * sizeof(long);
3343 unsigned total = c->bytes[0] + c->bytes[1];
3344 unsigned r;
3345
3346 /* total can not be zero. but just in case: */
3347 if (total == 0)
3348 return;
3349
3350 /* don't report if not compressed */
3351 if (total >= plain)
3352 return;
3353
3354 /* total < plain. check for overflow, still */
3355 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3356 : (1000 * total / plain);
3357
3358 if (r > 1000)
3359 r = 1000;
3360
3361 r = 1000 - r;
3362 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3363 "total %u; compression: %u.%u%%\n",
3364 direction,
3365 c->bytes[1], c->packets[1],
3366 c->bytes[0], c->packets[0],
3367 total, r/10, r % 10);
3368}
3369
3370/* Since we are processing the bitfield from lower addresses to higher,
3371 it does not matter if the process it in 32 bit chunks or 64 bit
3372 chunks as long as it is little endian. (Understand it as byte stream,
3373 beginning with the lowest byte...) If we would use big endian
3374 we would need to process it from the highest address to the lowest,
3375 in order to be agnostic to the 32 vs 64 bits issue.
3376
3377 returns 0 on failure, 1 if we successfully received it. */
3378static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3379{
3380 struct bm_xfer_ctx c;
3381 void *buffer;
3382 enum receive_bitmap_ret ret;
3383 int ok = FALSE;
3384
3385 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3386
3387 drbd_bm_lock(mdev, "receive bitmap");
3388
3389 /* maybe we should use some per thread scratch page,
3390 * and allocate that during initial device creation? */
3391 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3392 if (!buffer) {
3393 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3394 goto out;
3395 }
3396
3397 c = (struct bm_xfer_ctx) {
3398 .bm_bits = drbd_bm_bits(mdev),
3399 .bm_words = drbd_bm_words(mdev),
3400 };
3401
3402 do {
3403 if (h->command == P_BITMAP) {
3404 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3405 } else if (h->command == P_COMPRESSED_BITMAP) {
3406 /* MAYBE: sanity check that we speak proto >= 90,
3407 * and the feature is enabled! */
3408 struct p_compressed_bm *p;
3409
3410 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3411 dev_err(DEV, "ReportCBitmap packet too large\n");
3412 goto out;
3413 }
3414 /* use the page buff */
3415 p = buffer;
3416 memcpy(p, h, sizeof(*h));
3417 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3418 goto out;
3419 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3420 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3421 return FAILED;
3422 }
3423 ret = decode_bitmap_c(mdev, p, &c);
3424 } else {
3425 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3426 goto out;
3427 }
3428
3429 c.packets[h->command == P_BITMAP]++;
3430 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3431
3432 if (ret != OK)
3433 break;
3434
3435 if (!drbd_recv_header(mdev, h))
3436 goto out;
3437 } while (ret == OK);
3438 if (ret == FAILED)
3439 goto out;
3440
3441 INFO_bm_xfer_stats(mdev, "receive", &c);
3442
3443 if (mdev->state.conn == C_WF_BITMAP_T) {
3444 ok = !drbd_send_bitmap(mdev);
3445 if (!ok)
3446 goto out;
3447 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3448 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3449 D_ASSERT(ok == SS_SUCCESS);
3450 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3451 /* admin may have requested C_DISCONNECTING,
3452 * other threads may have noticed network errors */
3453 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3454 drbd_conn_str(mdev->state.conn));
3455 }
3456
3457 ok = TRUE;
3458 out:
3459 drbd_bm_unlock(mdev);
3460 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3461 drbd_start_resync(mdev, C_SYNC_SOURCE);
3462 free_page((unsigned long) buffer);
3463 return ok;
3464}
3465
3466static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3467{
3468 /* TODO zero copy sink :) */
3469 static char sink[128];
3470 int size, want, r;
3471
3472 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3473 h->command, h->length);
3474
3475 size = h->length;
3476 while (size > 0) {
3477 want = min_t(int, size, sizeof(sink));
3478 r = drbd_recv(mdev, sink, want);
3479 ERR_IF(r <= 0) break;
3480 size -= r;
3481 }
3482 return size == 0;
3483}
3484
3485static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3486{
3487 if (mdev->state.disk >= D_INCONSISTENT)
3488 drbd_kick_lo(mdev);
3489
3490 /* Make sure we've acked all the TCP data associated
3491 * with the data requests being unplugged */
3492 drbd_tcp_quickack(mdev->data.socket);
3493
3494 return TRUE;
3495}
3496
3497typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3498
3499static drbd_cmd_handler_f drbd_default_handler[] = {
3500 [P_DATA] = receive_Data,
3501 [P_DATA_REPLY] = receive_DataReply,
3502 [P_RS_DATA_REPLY] = receive_RSDataReply,
3503 [P_BARRIER] = receive_Barrier,
3504 [P_BITMAP] = receive_bitmap,
3505 [P_COMPRESSED_BITMAP] = receive_bitmap,
3506 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3507 [P_DATA_REQUEST] = receive_DataRequest,
3508 [P_RS_DATA_REQUEST] = receive_DataRequest,
3509 [P_SYNC_PARAM] = receive_SyncParam,
3510 [P_SYNC_PARAM89] = receive_SyncParam,
3511 [P_PROTOCOL] = receive_protocol,
3512 [P_UUIDS] = receive_uuids,
3513 [P_SIZES] = receive_sizes,
3514 [P_STATE] = receive_state,
3515 [P_STATE_CHG_REQ] = receive_req_state,
3516 [P_SYNC_UUID] = receive_sync_uuid,
3517 [P_OV_REQUEST] = receive_DataRequest,
3518 [P_OV_REPLY] = receive_DataRequest,
3519 [P_CSUM_RS_REQUEST] = receive_DataRequest,
3520 /* anything missing from this table is in
3521 * the asender_tbl, see get_asender_cmd */
3522 [P_MAX_CMD] = NULL,
3523};
3524
3525static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3526static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3527
3528static void drbdd(struct drbd_conf *mdev)
3529{
3530 drbd_cmd_handler_f handler;
3531 struct p_header *header = &mdev->data.rbuf.header;
3532
3533 while (get_t_state(&mdev->receiver) == Running) {
3534 drbd_thread_current_set_cpu(mdev);
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003535 if (!drbd_recv_header(mdev, header)) {
3536 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003537 break;
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003538 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003539
3540 if (header->command < P_MAX_CMD)
3541 handler = drbd_cmd_handler[header->command];
3542 else if (P_MAY_IGNORE < header->command
3543 && header->command < P_MAX_OPT_CMD)
3544 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3545 else if (header->command > P_MAX_OPT_CMD)
3546 handler = receive_skip;
3547 else
3548 handler = NULL;
3549
3550 if (unlikely(!handler)) {
3551 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3552 header->command, header->length);
3553 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3554 break;
3555 }
3556 if (unlikely(!handler(mdev, header))) {
3557 dev_err(DEV, "error receiving %s, l: %d!\n",
3558 cmdname(header->command), header->length);
3559 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3560 break;
3561 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003562 }
3563}
3564
3565static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3566{
3567 struct hlist_head *slot;
3568 struct hlist_node *pos;
3569 struct hlist_node *tmp;
3570 struct drbd_request *req;
3571 int i;
3572
3573 /*
3574 * Application READ requests
3575 */
3576 spin_lock_irq(&mdev->req_lock);
3577 for (i = 0; i < APP_R_HSIZE; i++) {
3578 slot = mdev->app_reads_hash+i;
3579 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3580 /* it may (but should not any longer!)
3581 * be on the work queue; if that assert triggers,
3582 * we need to also grab the
3583 * spin_lock_irq(&mdev->data.work.q_lock);
3584 * and list_del_init here. */
3585 D_ASSERT(list_empty(&req->w.list));
3586 /* It would be nice to complete outside of spinlock.
3587 * But this is easier for now. */
3588 _req_mod(req, connection_lost_while_pending);
3589 }
3590 }
3591 for (i = 0; i < APP_R_HSIZE; i++)
3592 if (!hlist_empty(mdev->app_reads_hash+i))
3593 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3594 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3595
3596 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3597 spin_unlock_irq(&mdev->req_lock);
3598}
3599
3600void drbd_flush_workqueue(struct drbd_conf *mdev)
3601{
3602 struct drbd_wq_barrier barr;
3603
3604 barr.w.cb = w_prev_work_done;
3605 init_completion(&barr.done);
3606 drbd_queue_work(&mdev->data.work, &barr.w);
3607 wait_for_completion(&barr.done);
3608}
3609
3610static void drbd_disconnect(struct drbd_conf *mdev)
3611{
3612 enum drbd_fencing_p fp;
3613 union drbd_state os, ns;
3614 int rv = SS_UNKNOWN_ERROR;
3615 unsigned int i;
3616
3617 if (mdev->state.conn == C_STANDALONE)
3618 return;
3619 if (mdev->state.conn >= C_WF_CONNECTION)
3620 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3621 drbd_conn_str(mdev->state.conn));
3622
3623 /* asender does not clean up anything. it must not interfere, either */
3624 drbd_thread_stop(&mdev->asender);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003625 drbd_free_sock(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003626
3627 spin_lock_irq(&mdev->req_lock);
3628 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3629 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3630 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3631 spin_unlock_irq(&mdev->req_lock);
3632
3633 /* We do not have data structures that would allow us to
3634 * get the rs_pending_cnt down to 0 again.
3635 * * On C_SYNC_TARGET we do not have any data structures describing
3636 * the pending RSDataRequest's we have sent.
3637 * * On C_SYNC_SOURCE there is no data structure that tracks
3638 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3639 * And no, it is not the sum of the reference counts in the
3640 * resync_LRU. The resync_LRU tracks the whole operation including
3641 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3642 * on the fly. */
3643 drbd_rs_cancel_all(mdev);
3644 mdev->rs_total = 0;
3645 mdev->rs_failed = 0;
3646 atomic_set(&mdev->rs_pending_cnt, 0);
3647 wake_up(&mdev->misc_wait);
3648
3649 /* make sure syncer is stopped and w_resume_next_sg queued */
3650 del_timer_sync(&mdev->resync_timer);
3651 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3652 resync_timer_fn((unsigned long)mdev);
3653
Philipp Reisnerb411b362009-09-25 16:07:19 -07003654 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3655 * w_make_resync_request etc. which may still be on the worker queue
3656 * to be "canceled" */
3657 drbd_flush_workqueue(mdev);
3658
3659 /* This also does reclaim_net_ee(). If we do this too early, we might
3660 * miss some resync ee and pages.*/
3661 drbd_process_done_ee(mdev);
3662
3663 kfree(mdev->p_uuid);
3664 mdev->p_uuid = NULL;
3665
3666 if (!mdev->state.susp)
3667 tl_clear(mdev);
3668
3669 drbd_fail_pending_reads(mdev);
3670
3671 dev_info(DEV, "Connection closed\n");
3672
3673 drbd_md_sync(mdev);
3674
3675 fp = FP_DONT_CARE;
3676 if (get_ldev(mdev)) {
3677 fp = mdev->ldev->dc.fencing;
3678 put_ldev(mdev);
3679 }
3680
3681 if (mdev->state.role == R_PRIMARY) {
3682 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3683 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3684 drbd_request_state(mdev, NS(pdsk, nps));
3685 }
3686 }
3687
3688 spin_lock_irq(&mdev->req_lock);
3689 os = mdev->state;
3690 if (os.conn >= C_UNCONNECTED) {
3691 /* Do not restart in case we are C_DISCONNECTING */
3692 ns = os;
3693 ns.conn = C_UNCONNECTED;
3694 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3695 }
3696 spin_unlock_irq(&mdev->req_lock);
3697
3698 if (os.conn == C_DISCONNECTING) {
3699 struct hlist_head *h;
3700 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3701
3702 /* we must not free the tl_hash
3703 * while application io is still on the fly */
3704 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3705
3706 spin_lock_irq(&mdev->req_lock);
3707 /* paranoia code */
3708 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3709 if (h->first)
3710 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3711 (int)(h - mdev->ee_hash), h->first);
3712 kfree(mdev->ee_hash);
3713 mdev->ee_hash = NULL;
3714 mdev->ee_hash_s = 0;
3715
3716 /* paranoia code */
3717 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3718 if (h->first)
3719 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3720 (int)(h - mdev->tl_hash), h->first);
3721 kfree(mdev->tl_hash);
3722 mdev->tl_hash = NULL;
3723 mdev->tl_hash_s = 0;
3724 spin_unlock_irq(&mdev->req_lock);
3725
3726 crypto_free_hash(mdev->cram_hmac_tfm);
3727 mdev->cram_hmac_tfm = NULL;
3728
3729 kfree(mdev->net_conf);
3730 mdev->net_conf = NULL;
3731 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3732 }
3733
3734 /* tcp_close and release of sendpage pages can be deferred. I don't
3735 * want to use SO_LINGER, because apparently it can be deferred for
3736 * more than 20 seconds (longest time I checked).
3737 *
3738 * Actually we don't care for exactly when the network stack does its
3739 * put_page(), but release our reference on these pages right here.
3740 */
3741 i = drbd_release_ee(mdev, &mdev->net_ee);
3742 if (i)
3743 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3744 i = atomic_read(&mdev->pp_in_use);
3745 if (i)
3746 dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3747
3748 D_ASSERT(list_empty(&mdev->read_ee));
3749 D_ASSERT(list_empty(&mdev->active_ee));
3750 D_ASSERT(list_empty(&mdev->sync_ee));
3751 D_ASSERT(list_empty(&mdev->done_ee));
3752
3753 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3754 atomic_set(&mdev->current_epoch->epoch_size, 0);
3755 D_ASSERT(list_empty(&mdev->current_epoch->list));
3756}
3757
3758/*
3759 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3760 * we can agree on is stored in agreed_pro_version.
3761 *
3762 * feature flags and the reserved array should be enough room for future
3763 * enhancements of the handshake protocol, and possible plugins...
3764 *
3765 * for now, they are expected to be zero, but ignored.
3766 */
3767static int drbd_send_handshake(struct drbd_conf *mdev)
3768{
3769 /* ASSERT current == mdev->receiver ... */
3770 struct p_handshake *p = &mdev->data.sbuf.handshake;
3771 int ok;
3772
3773 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3774 dev_err(DEV, "interrupted during initial handshake\n");
3775 return 0; /* interrupted. not ok. */
3776 }
3777
3778 if (mdev->data.socket == NULL) {
3779 mutex_unlock(&mdev->data.mutex);
3780 return 0;
3781 }
3782
3783 memset(p, 0, sizeof(*p));
3784 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3785 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3786 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3787 (struct p_header *)p, sizeof(*p), 0 );
3788 mutex_unlock(&mdev->data.mutex);
3789 return ok;
3790}
3791
3792/*
3793 * return values:
3794 * 1 yes, we have a valid connection
3795 * 0 oops, did not work out, please try again
3796 * -1 peer talks different language,
3797 * no point in trying again, please go standalone.
3798 */
3799static int drbd_do_handshake(struct drbd_conf *mdev)
3800{
3801 /* ASSERT current == mdev->receiver ... */
3802 struct p_handshake *p = &mdev->data.rbuf.handshake;
3803 const int expect = sizeof(struct p_handshake)
3804 -sizeof(struct p_header);
3805 int rv;
3806
3807 rv = drbd_send_handshake(mdev);
3808 if (!rv)
3809 return 0;
3810
3811 rv = drbd_recv_header(mdev, &p->head);
3812 if (!rv)
3813 return 0;
3814
3815 if (p->head.command != P_HAND_SHAKE) {
3816 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3817 cmdname(p->head.command), p->head.command);
3818 return -1;
3819 }
3820
3821 if (p->head.length != expect) {
3822 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3823 expect, p->head.length);
3824 return -1;
3825 }
3826
3827 rv = drbd_recv(mdev, &p->head.payload, expect);
3828
3829 if (rv != expect) {
3830 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3831 return 0;
3832 }
3833
Philipp Reisnerb411b362009-09-25 16:07:19 -07003834 p->protocol_min = be32_to_cpu(p->protocol_min);
3835 p->protocol_max = be32_to_cpu(p->protocol_max);
3836 if (p->protocol_max == 0)
3837 p->protocol_max = p->protocol_min;
3838
3839 if (PRO_VERSION_MAX < p->protocol_min ||
3840 PRO_VERSION_MIN > p->protocol_max)
3841 goto incompat;
3842
3843 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3844
3845 dev_info(DEV, "Handshake successful: "
3846 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3847
3848 return 1;
3849
3850 incompat:
3851 dev_err(DEV, "incompatible DRBD dialects: "
3852 "I support %d-%d, peer supports %d-%d\n",
3853 PRO_VERSION_MIN, PRO_VERSION_MAX,
3854 p->protocol_min, p->protocol_max);
3855 return -1;
3856}
3857
3858#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3859static int drbd_do_auth(struct drbd_conf *mdev)
3860{
3861 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3862 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003863 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003864}
3865#else
3866#define CHALLENGE_LEN 64
Johannes Thomab10d96c2010-01-07 16:02:50 +01003867
3868/* Return value:
3869 1 - auth succeeded,
3870 0 - failed, try again (network error),
3871 -1 - auth failed, don't try again.
3872*/
3873
Philipp Reisnerb411b362009-09-25 16:07:19 -07003874static int drbd_do_auth(struct drbd_conf *mdev)
3875{
3876 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
3877 struct scatterlist sg;
3878 char *response = NULL;
3879 char *right_response = NULL;
3880 char *peers_ch = NULL;
3881 struct p_header p;
3882 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3883 unsigned int resp_size;
3884 struct hash_desc desc;
3885 int rv;
3886
3887 desc.tfm = mdev->cram_hmac_tfm;
3888 desc.flags = 0;
3889
3890 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3891 (u8 *)mdev->net_conf->shared_secret, key_len);
3892 if (rv) {
3893 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01003894 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003895 goto fail;
3896 }
3897
3898 get_random_bytes(my_challenge, CHALLENGE_LEN);
3899
3900 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3901 if (!rv)
3902 goto fail;
3903
3904 rv = drbd_recv_header(mdev, &p);
3905 if (!rv)
3906 goto fail;
3907
3908 if (p.command != P_AUTH_CHALLENGE) {
3909 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3910 cmdname(p.command), p.command);
3911 rv = 0;
3912 goto fail;
3913 }
3914
3915 if (p.length > CHALLENGE_LEN*2) {
3916 dev_err(DEV, "expected AuthChallenge payload too big.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003917 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003918 goto fail;
3919 }
3920
3921 peers_ch = kmalloc(p.length, GFP_NOIO);
3922 if (peers_ch == NULL) {
3923 dev_err(DEV, "kmalloc of peers_ch failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003924 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003925 goto fail;
3926 }
3927
3928 rv = drbd_recv(mdev, peers_ch, p.length);
3929
3930 if (rv != p.length) {
3931 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3932 rv = 0;
3933 goto fail;
3934 }
3935
3936 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3937 response = kmalloc(resp_size, GFP_NOIO);
3938 if (response == NULL) {
3939 dev_err(DEV, "kmalloc of response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003940 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003941 goto fail;
3942 }
3943
3944 sg_init_table(&sg, 1);
3945 sg_set_buf(&sg, peers_ch, p.length);
3946
3947 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3948 if (rv) {
3949 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01003950 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003951 goto fail;
3952 }
3953
3954 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3955 if (!rv)
3956 goto fail;
3957
3958 rv = drbd_recv_header(mdev, &p);
3959 if (!rv)
3960 goto fail;
3961
3962 if (p.command != P_AUTH_RESPONSE) {
3963 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3964 cmdname(p.command), p.command);
3965 rv = 0;
3966 goto fail;
3967 }
3968
3969 if (p.length != resp_size) {
3970 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3971 rv = 0;
3972 goto fail;
3973 }
3974
3975 rv = drbd_recv(mdev, response , resp_size);
3976
3977 if (rv != resp_size) {
3978 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3979 rv = 0;
3980 goto fail;
3981 }
3982
3983 right_response = kmalloc(resp_size, GFP_NOIO);
Julia Lawall2d1ee872009-12-27 22:27:11 +01003984 if (right_response == NULL) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07003985 dev_err(DEV, "kmalloc of right_response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003986 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003987 goto fail;
3988 }
3989
3990 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3991
3992 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3993 if (rv) {
3994 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01003995 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003996 goto fail;
3997 }
3998
3999 rv = !memcmp(response, right_response, resp_size);
4000
4001 if (rv)
4002 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4003 resp_size, mdev->net_conf->cram_hmac_alg);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004004 else
4005 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004006
4007 fail:
4008 kfree(peers_ch);
4009 kfree(response);
4010 kfree(right_response);
4011
4012 return rv;
4013}
4014#endif
4015
4016int drbdd_init(struct drbd_thread *thi)
4017{
4018 struct drbd_conf *mdev = thi->mdev;
4019 unsigned int minor = mdev_to_minor(mdev);
4020 int h;
4021
4022 sprintf(current->comm, "drbd%d_receiver", minor);
4023
4024 dev_info(DEV, "receiver (re)started\n");
4025
4026 do {
4027 h = drbd_connect(mdev);
4028 if (h == 0) {
4029 drbd_disconnect(mdev);
4030 __set_current_state(TASK_INTERRUPTIBLE);
4031 schedule_timeout(HZ);
4032 }
4033 if (h == -1) {
4034 dev_warn(DEV, "Discarding network configuration.\n");
4035 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4036 }
4037 } while (h == 0);
4038
4039 if (h > 0) {
4040 if (get_net_conf(mdev)) {
4041 drbdd(mdev);
4042 put_net_conf(mdev);
4043 }
4044 }
4045
4046 drbd_disconnect(mdev);
4047
4048 dev_info(DEV, "receiver terminated\n");
4049 return 0;
4050}
4051
4052/* ********* acknowledge sender ******** */
4053
4054static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4055{
4056 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4057
4058 int retcode = be32_to_cpu(p->retcode);
4059
4060 if (retcode >= SS_SUCCESS) {
4061 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4062 } else {
4063 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4064 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4065 drbd_set_st_err_str(retcode), retcode);
4066 }
4067 wake_up(&mdev->state_wait);
4068
4069 return TRUE;
4070}
4071
4072static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4073{
4074 return drbd_send_ping_ack(mdev);
4075
4076}
4077
4078static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4079{
4080 /* restore idle timeout */
4081 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
Philipp Reisner309d1602010-03-02 15:03:44 +01004082 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4083 wake_up(&mdev->misc_wait);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004084
4085 return TRUE;
4086}
4087
4088static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4089{
4090 struct p_block_ack *p = (struct p_block_ack *)h;
4091 sector_t sector = be64_to_cpu(p->sector);
4092 int blksize = be32_to_cpu(p->blksize);
4093
4094 D_ASSERT(mdev->agreed_pro_version >= 89);
4095
4096 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4097
4098 drbd_rs_complete_io(mdev, sector);
4099 drbd_set_in_sync(mdev, sector, blksize);
4100 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4101 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4102 dec_rs_pending(mdev);
4103
4104 return TRUE;
4105}
4106
4107/* when we receive the ACK for a write request,
4108 * verify that we actually know about it */
4109static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4110 u64 id, sector_t sector)
4111{
4112 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4113 struct hlist_node *n;
4114 struct drbd_request *req;
4115
4116 hlist_for_each_entry(req, n, slot, colision) {
4117 if ((unsigned long)req == (unsigned long)id) {
4118 if (req->sector != sector) {
4119 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4120 "wrong sector (%llus versus %llus)\n", req,
4121 (unsigned long long)req->sector,
4122 (unsigned long long)sector);
4123 break;
4124 }
4125 return req;
4126 }
4127 }
4128 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4129 (void *)(unsigned long)id, (unsigned long long)sector);
4130 return NULL;
4131}
4132
4133typedef struct drbd_request *(req_validator_fn)
4134 (struct drbd_conf *mdev, u64 id, sector_t sector);
4135
4136static int validate_req_change_req_state(struct drbd_conf *mdev,
4137 u64 id, sector_t sector, req_validator_fn validator,
4138 const char *func, enum drbd_req_event what)
4139{
4140 struct drbd_request *req;
4141 struct bio_and_error m;
4142
4143 spin_lock_irq(&mdev->req_lock);
4144 req = validator(mdev, id, sector);
4145 if (unlikely(!req)) {
4146 spin_unlock_irq(&mdev->req_lock);
4147 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4148 return FALSE;
4149 }
4150 __req_mod(req, what, &m);
4151 spin_unlock_irq(&mdev->req_lock);
4152
4153 if (m.bio)
4154 complete_master_bio(mdev, &m);
4155 return TRUE;
4156}
4157
4158static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4159{
4160 struct p_block_ack *p = (struct p_block_ack *)h;
4161 sector_t sector = be64_to_cpu(p->sector);
4162 int blksize = be32_to_cpu(p->blksize);
4163 enum drbd_req_event what;
4164
4165 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4166
4167 if (is_syncer_block_id(p->block_id)) {
4168 drbd_set_in_sync(mdev, sector, blksize);
4169 dec_rs_pending(mdev);
4170 return TRUE;
4171 }
4172 switch (be16_to_cpu(h->command)) {
4173 case P_RS_WRITE_ACK:
4174 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4175 what = write_acked_by_peer_and_sis;
4176 break;
4177 case P_WRITE_ACK:
4178 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4179 what = write_acked_by_peer;
4180 break;
4181 case P_RECV_ACK:
4182 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4183 what = recv_acked_by_peer;
4184 break;
4185 case P_DISCARD_ACK:
4186 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4187 what = conflict_discarded_by_peer;
4188 break;
4189 default:
4190 D_ASSERT(0);
4191 return FALSE;
4192 }
4193
4194 return validate_req_change_req_state(mdev, p->block_id, sector,
4195 _ack_id_to_req, __func__ , what);
4196}
4197
4198static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4199{
4200 struct p_block_ack *p = (struct p_block_ack *)h;
4201 sector_t sector = be64_to_cpu(p->sector);
4202
4203 if (__ratelimit(&drbd_ratelimit_state))
4204 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4205
4206 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4207
4208 if (is_syncer_block_id(p->block_id)) {
4209 int size = be32_to_cpu(p->blksize);
4210 dec_rs_pending(mdev);
4211 drbd_rs_failed_io(mdev, sector, size);
4212 return TRUE;
4213 }
4214 return validate_req_change_req_state(mdev, p->block_id, sector,
4215 _ack_id_to_req, __func__ , neg_acked);
4216}
4217
4218static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4219{
4220 struct p_block_ack *p = (struct p_block_ack *)h;
4221 sector_t sector = be64_to_cpu(p->sector);
4222
4223 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4224 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4225 (unsigned long long)sector, be32_to_cpu(p->blksize));
4226
4227 return validate_req_change_req_state(mdev, p->block_id, sector,
4228 _ar_id_to_req, __func__ , neg_acked);
4229}
4230
4231static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4232{
4233 sector_t sector;
4234 int size;
4235 struct p_block_ack *p = (struct p_block_ack *)h;
4236
4237 sector = be64_to_cpu(p->sector);
4238 size = be32_to_cpu(p->blksize);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004239
4240 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4241
4242 dec_rs_pending(mdev);
4243
4244 if (get_ldev_if_state(mdev, D_FAILED)) {
4245 drbd_rs_complete_io(mdev, sector);
4246 drbd_rs_failed_io(mdev, sector, size);
4247 put_ldev(mdev);
4248 }
4249
4250 return TRUE;
4251}
4252
4253static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4254{
4255 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4256
4257 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4258
4259 return TRUE;
4260}
4261
4262static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4263{
4264 struct p_block_ack *p = (struct p_block_ack *)h;
4265 struct drbd_work *w;
4266 sector_t sector;
4267 int size;
4268
4269 sector = be64_to_cpu(p->sector);
4270 size = be32_to_cpu(p->blksize);
4271
4272 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4273
4274 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4275 drbd_ov_oos_found(mdev, sector, size);
4276 else
4277 ov_oos_print(mdev);
4278
4279 drbd_rs_complete_io(mdev, sector);
4280 dec_rs_pending(mdev);
4281
4282 if (--mdev->ov_left == 0) {
4283 w = kmalloc(sizeof(*w), GFP_NOIO);
4284 if (w) {
4285 w->cb = w_ov_finished;
4286 drbd_queue_work_front(&mdev->data.work, w);
4287 } else {
4288 dev_err(DEV, "kmalloc(w) failed.");
4289 ov_oos_print(mdev);
4290 drbd_resync_finished(mdev);
4291 }
4292 }
4293 return TRUE;
4294}
4295
4296struct asender_cmd {
4297 size_t pkt_size;
4298 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4299};
4300
4301static struct asender_cmd *get_asender_cmd(int cmd)
4302{
4303 static struct asender_cmd asender_tbl[] = {
4304 /* anything missing from this table is in
4305 * the drbd_cmd_handler (drbd_default_handler) table,
4306 * see the beginning of drbdd() */
4307 [P_PING] = { sizeof(struct p_header), got_Ping },
4308 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4309 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4310 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4311 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4312 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4313 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4314 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4315 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4316 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4317 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4318 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4319 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4320 [P_MAX_CMD] = { 0, NULL },
4321 };
4322 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4323 return NULL;
4324 return &asender_tbl[cmd];
4325}
4326
4327int drbd_asender(struct drbd_thread *thi)
4328{
4329 struct drbd_conf *mdev = thi->mdev;
4330 struct p_header *h = &mdev->meta.rbuf.header;
4331 struct asender_cmd *cmd = NULL;
4332
4333 int rv, len;
4334 void *buf = h;
4335 int received = 0;
4336 int expect = sizeof(struct p_header);
4337 int empty;
4338
4339 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4340
4341 current->policy = SCHED_RR; /* Make this a realtime task! */
4342 current->rt_priority = 2; /* more important than all other tasks */
4343
4344 while (get_t_state(thi) == Running) {
4345 drbd_thread_current_set_cpu(mdev);
4346 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4347 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4348 mdev->meta.socket->sk->sk_rcvtimeo =
4349 mdev->net_conf->ping_timeo*HZ/10;
4350 }
4351
4352 /* conditionally cork;
4353 * it may hurt latency if we cork without much to send */
4354 if (!mdev->net_conf->no_cork &&
4355 3 < atomic_read(&mdev->unacked_cnt))
4356 drbd_tcp_cork(mdev->meta.socket);
4357 while (1) {
4358 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4359 flush_signals(current);
4360 if (!drbd_process_done_ee(mdev)) {
4361 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4362 goto reconnect;
4363 }
4364 /* to avoid race with newly queued ACKs */
4365 set_bit(SIGNAL_ASENDER, &mdev->flags);
4366 spin_lock_irq(&mdev->req_lock);
4367 empty = list_empty(&mdev->done_ee);
4368 spin_unlock_irq(&mdev->req_lock);
4369 /* new ack may have been queued right here,
4370 * but then there is also a signal pending,
4371 * and we start over... */
4372 if (empty)
4373 break;
4374 }
4375 /* but unconditionally uncork unless disabled */
4376 if (!mdev->net_conf->no_cork)
4377 drbd_tcp_uncork(mdev->meta.socket);
4378
4379 /* short circuit, recv_msg would return EINTR anyways. */
4380 if (signal_pending(current))
4381 continue;
4382
4383 rv = drbd_recv_short(mdev, mdev->meta.socket,
4384 buf, expect-received, 0);
4385 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4386
4387 flush_signals(current);
4388
4389 /* Note:
4390 * -EINTR (on meta) we got a signal
4391 * -EAGAIN (on meta) rcvtimeo expired
4392 * -ECONNRESET other side closed the connection
4393 * -ERESTARTSYS (on data) we got a signal
4394 * rv < 0 other than above: unexpected error!
4395 * rv == expected: full header or command
4396 * rv < expected: "woken" by signal during receive
4397 * rv == 0 : "connection shut down by peer"
4398 */
4399 if (likely(rv > 0)) {
4400 received += rv;
4401 buf += rv;
4402 } else if (rv == 0) {
4403 dev_err(DEV, "meta connection shut down by peer.\n");
4404 goto reconnect;
4405 } else if (rv == -EAGAIN) {
4406 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4407 mdev->net_conf->ping_timeo*HZ/10) {
4408 dev_err(DEV, "PingAck did not arrive in time.\n");
4409 goto reconnect;
4410 }
4411 set_bit(SEND_PING, &mdev->flags);
4412 continue;
4413 } else if (rv == -EINTR) {
4414 continue;
4415 } else {
4416 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4417 goto reconnect;
4418 }
4419
4420 if (received == expect && cmd == NULL) {
4421 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4422 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4423 (long)be32_to_cpu(h->magic),
4424 h->command, h->length);
4425 goto reconnect;
4426 }
4427 cmd = get_asender_cmd(be16_to_cpu(h->command));
4428 len = be16_to_cpu(h->length);
4429 if (unlikely(cmd == NULL)) {
4430 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4431 (long)be32_to_cpu(h->magic),
4432 h->command, h->length);
4433 goto disconnect;
4434 }
4435 expect = cmd->pkt_size;
Jens Axboe6a0afdf2009-10-01 09:04:14 +02004436 ERR_IF(len != expect-sizeof(struct p_header))
Philipp Reisnerb411b362009-09-25 16:07:19 -07004437 goto reconnect;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004438 }
4439 if (received == expect) {
4440 D_ASSERT(cmd != NULL);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004441 if (!cmd->process(mdev, h))
4442 goto reconnect;
4443
4444 buf = h;
4445 received = 0;
4446 expect = sizeof(struct p_header);
4447 cmd = NULL;
4448 }
4449 }
4450
4451 if (0) {
4452reconnect:
4453 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4454 }
4455 if (0) {
4456disconnect:
4457 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4458 }
4459 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4460
4461 D_ASSERT(mdev->state.conn < C_CONNECTED);
4462 dev_info(DEV, "asender terminated\n");
4463
4464 return 0;
4465}