blob: 63686c4d85cfd4257d95d0f9265f16b63bdeaa3d [file] [log] [blame]
Philipp Reisnerb411b362009-09-25 16:07:19 -07001/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26#include <linux/autoconf.h>
27#include <linux/module.h>
28
29#include <asm/uaccess.h>
30#include <net/sock.h>
31
32#include <linux/version.h>
33#include <linux/drbd.h>
34#include <linux/fs.h>
35#include <linux/file.h>
36#include <linux/in.h>
37#include <linux/mm.h>
38#include <linux/memcontrol.h>
39#include <linux/mm_inline.h>
40#include <linux/slab.h>
41#include <linux/smp_lock.h>
42#include <linux/pkt_sched.h>
43#define __KERNEL_SYSCALLS__
44#include <linux/unistd.h>
45#include <linux/vmalloc.h>
46#include <linux/random.h>
47#include <linux/mm.h>
48#include <linux/string.h>
49#include <linux/scatterlist.h>
50#include "drbd_int.h"
51#include "drbd_tracing.h"
52#include "drbd_req.h"
53
54#include "drbd_vli.h"
55
56struct flush_work {
57 struct drbd_work w;
58 struct drbd_epoch *epoch;
59};
60
61enum finish_epoch {
62 FE_STILL_LIVE,
63 FE_DESTROYED,
64 FE_RECYCLED,
65};
66
67static int drbd_do_handshake(struct drbd_conf *mdev);
68static int drbd_do_auth(struct drbd_conf *mdev);
69
70static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
71static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
72
73static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
74{
75 struct drbd_epoch *prev;
76 spin_lock(&mdev->epoch_lock);
77 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
78 if (prev == epoch || prev == mdev->current_epoch)
79 prev = NULL;
80 spin_unlock(&mdev->epoch_lock);
81 return prev;
82}
83
84#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
85
86static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
87{
88 struct page *page = NULL;
89
90 /* Yes, testing drbd_pp_vacant outside the lock is racy.
91 * So what. It saves a spin_lock. */
92 if (drbd_pp_vacant > 0) {
93 spin_lock(&drbd_pp_lock);
94 page = drbd_pp_pool;
95 if (page) {
96 drbd_pp_pool = (struct page *)page_private(page);
97 set_page_private(page, 0); /* just to be polite */
98 drbd_pp_vacant--;
99 }
100 spin_unlock(&drbd_pp_lock);
101 }
102 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
103 * "criss-cross" setup, that might cause write-out on some other DRBD,
104 * which in turn might block on the other node at this very place. */
105 if (!page)
106 page = alloc_page(GFP_TRY);
107 if (page)
108 atomic_inc(&mdev->pp_in_use);
109 return page;
110}
111
112/* kick lower level device, if we have more than (arbitrary number)
113 * reference counts on it, which typically are locally submitted io
114 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
115static void maybe_kick_lo(struct drbd_conf *mdev)
116{
117 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
118 drbd_kick_lo(mdev);
119}
120
121static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
122{
123 struct drbd_epoch_entry *e;
124 struct list_head *le, *tle;
125
126 /* The EEs are always appended to the end of the list. Since
127 they are sent in order over the wire, they have to finish
128 in order. As soon as we see the first not finished we can
129 stop to examine the list... */
130
131 list_for_each_safe(le, tle, &mdev->net_ee) {
132 e = list_entry(le, struct drbd_epoch_entry, w.list);
133 if (drbd_bio_has_active_page(e->private_bio))
134 break;
135 list_move(le, to_be_freed);
136 }
137}
138
139static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
140{
141 LIST_HEAD(reclaimed);
142 struct drbd_epoch_entry *e, *t;
143
144 maybe_kick_lo(mdev);
145 spin_lock_irq(&mdev->req_lock);
146 reclaim_net_ee(mdev, &reclaimed);
147 spin_unlock_irq(&mdev->req_lock);
148
149 list_for_each_entry_safe(e, t, &reclaimed, w.list)
150 drbd_free_ee(mdev, e);
151}
152
153/**
154 * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
155 * @mdev: DRBD device.
156 * @retry: whether or not to retry allocation forever (or until signalled)
157 *
158 * Tries to allocate a page, first from our own page pool, then from the
159 * kernel, unless this allocation would exceed the max_buffers setting.
160 * If @retry is non-zero, retry until DRBD frees a page somewhere else.
161 */
162static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
163{
164 struct page *page = NULL;
165 DEFINE_WAIT(wait);
166
167 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
168 page = drbd_pp_first_page_or_try_alloc(mdev);
169 if (page)
170 return page;
171 }
172
173 for (;;) {
174 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
175
176 drbd_kick_lo_and_reclaim_net(mdev);
177
178 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
179 page = drbd_pp_first_page_or_try_alloc(mdev);
180 if (page)
181 break;
182 }
183
184 if (!retry)
185 break;
186
187 if (signal_pending(current)) {
188 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
189 break;
190 }
191
192 schedule();
193 }
194 finish_wait(&drbd_pp_wait, &wait);
195
196 return page;
197}
198
199/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
200 * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
201static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
202{
203 int free_it;
204
205 spin_lock(&drbd_pp_lock);
206 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
207 free_it = 1;
208 } else {
209 set_page_private(page, (unsigned long)drbd_pp_pool);
210 drbd_pp_pool = page;
211 drbd_pp_vacant++;
212 free_it = 0;
213 }
214 spin_unlock(&drbd_pp_lock);
215
216 atomic_dec(&mdev->pp_in_use);
217
218 if (free_it)
219 __free_page(page);
220
221 wake_up(&drbd_pp_wait);
222}
223
224static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
225{
226 struct page *p_to_be_freed = NULL;
227 struct page *page;
228 struct bio_vec *bvec;
229 int i;
230
231 spin_lock(&drbd_pp_lock);
232 __bio_for_each_segment(bvec, bio, i, 0) {
233 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
234 set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
235 p_to_be_freed = bvec->bv_page;
236 } else {
237 set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
238 drbd_pp_pool = bvec->bv_page;
239 drbd_pp_vacant++;
240 }
241 }
242 spin_unlock(&drbd_pp_lock);
243 atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
244
245 while (p_to_be_freed) {
246 page = p_to_be_freed;
247 p_to_be_freed = (struct page *)page_private(page);
248 set_page_private(page, 0); /* just to be polite */
249 put_page(page);
250 }
251
252 wake_up(&drbd_pp_wait);
253}
254
255/*
256You need to hold the req_lock:
257 _drbd_wait_ee_list_empty()
258
259You must not have the req_lock:
260 drbd_free_ee()
261 drbd_alloc_ee()
262 drbd_init_ee()
263 drbd_release_ee()
264 drbd_ee_fix_bhs()
265 drbd_process_done_ee()
266 drbd_clear_done_ee()
267 drbd_wait_ee_list_empty()
268*/
269
270struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
271 u64 id,
272 sector_t sector,
273 unsigned int data_size,
274 gfp_t gfp_mask) __must_hold(local)
275{
276 struct request_queue *q;
277 struct drbd_epoch_entry *e;
278 struct page *page;
279 struct bio *bio;
280 unsigned int ds;
281
282 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
283 return NULL;
284
285 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
286 if (!e) {
287 if (!(gfp_mask & __GFP_NOWARN))
288 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
289 return NULL;
290 }
291
292 bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
293 if (!bio) {
294 if (!(gfp_mask & __GFP_NOWARN))
295 dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
296 goto fail1;
297 }
298
299 bio->bi_bdev = mdev->ldev->backing_bdev;
300 bio->bi_sector = sector;
301
302 ds = data_size;
303 while (ds) {
304 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
305 if (!page) {
306 if (!(gfp_mask & __GFP_NOWARN))
307 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
308 goto fail2;
309 }
310 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
311 drbd_pp_free(mdev, page);
312 dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
313 "data_size=%u,ds=%u) failed\n",
314 (unsigned long long)sector, data_size, ds);
315
316 q = bdev_get_queue(bio->bi_bdev);
317 if (q->merge_bvec_fn) {
318 struct bvec_merge_data bvm = {
319 .bi_bdev = bio->bi_bdev,
320 .bi_sector = bio->bi_sector,
321 .bi_size = bio->bi_size,
322 .bi_rw = bio->bi_rw,
323 };
324 int l = q->merge_bvec_fn(q, &bvm,
325 &bio->bi_io_vec[bio->bi_vcnt]);
326 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
327 }
328
329 /* dump more of the bio. */
330 dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
331 dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
332 dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
333 dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
334
335 goto fail2;
336 break;
337 }
338 ds -= min_t(int, ds, PAGE_SIZE);
339 }
340
341 D_ASSERT(data_size == bio->bi_size);
342
343 bio->bi_private = e;
344 e->mdev = mdev;
345 e->sector = sector;
346 e->size = bio->bi_size;
347
348 e->private_bio = bio;
349 e->block_id = id;
350 INIT_HLIST_NODE(&e->colision);
351 e->epoch = NULL;
352 e->flags = 0;
353
354 trace_drbd_ee(mdev, e, "allocated");
355
356 return e;
357
358 fail2:
359 drbd_pp_free_bio_pages(mdev, bio);
360 bio_put(bio);
361 fail1:
362 mempool_free(e, drbd_ee_mempool);
363
364 return NULL;
365}
366
367void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
368{
369 struct bio *bio = e->private_bio;
370 trace_drbd_ee(mdev, e, "freed");
371 drbd_pp_free_bio_pages(mdev, bio);
372 bio_put(bio);
373 D_ASSERT(hlist_unhashed(&e->colision));
374 mempool_free(e, drbd_ee_mempool);
375}
376
377int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
378{
379 LIST_HEAD(work_list);
380 struct drbd_epoch_entry *e, *t;
381 int count = 0;
382
383 spin_lock_irq(&mdev->req_lock);
384 list_splice_init(list, &work_list);
385 spin_unlock_irq(&mdev->req_lock);
386
387 list_for_each_entry_safe(e, t, &work_list, w.list) {
388 drbd_free_ee(mdev, e);
389 count++;
390 }
391 return count;
392}
393
394
395/*
396 * This function is called from _asender only_
397 * but see also comments in _req_mod(,barrier_acked)
398 * and receive_Barrier.
399 *
400 * Move entries from net_ee to done_ee, if ready.
401 * Grab done_ee, call all callbacks, free the entries.
402 * The callbacks typically send out ACKs.
403 */
404static int drbd_process_done_ee(struct drbd_conf *mdev)
405{
406 LIST_HEAD(work_list);
407 LIST_HEAD(reclaimed);
408 struct drbd_epoch_entry *e, *t;
409 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
410
411 spin_lock_irq(&mdev->req_lock);
412 reclaim_net_ee(mdev, &reclaimed);
413 list_splice_init(&mdev->done_ee, &work_list);
414 spin_unlock_irq(&mdev->req_lock);
415
416 list_for_each_entry_safe(e, t, &reclaimed, w.list)
417 drbd_free_ee(mdev, e);
418
419 /* possible callbacks here:
420 * e_end_block, and e_end_resync_block, e_send_discard_ack.
421 * all ignore the last argument.
422 */
423 list_for_each_entry_safe(e, t, &work_list, w.list) {
424 trace_drbd_ee(mdev, e, "process_done_ee");
425 /* list_del not necessary, next/prev members not touched */
426 ok = e->w.cb(mdev, &e->w, !ok) && ok;
427 drbd_free_ee(mdev, e);
428 }
429 wake_up(&mdev->ee_wait);
430
431 return ok;
432}
433
434void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
435{
436 DEFINE_WAIT(wait);
437
438 /* avoids spin_lock/unlock
439 * and calling prepare_to_wait in the fast path */
440 while (!list_empty(head)) {
441 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
442 spin_unlock_irq(&mdev->req_lock);
443 drbd_kick_lo(mdev);
444 schedule();
445 finish_wait(&mdev->ee_wait, &wait);
446 spin_lock_irq(&mdev->req_lock);
447 }
448}
449
450void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
451{
452 spin_lock_irq(&mdev->req_lock);
453 _drbd_wait_ee_list_empty(mdev, head);
454 spin_unlock_irq(&mdev->req_lock);
455}
456
457/* see also kernel_accept; which is only present since 2.6.18.
458 * also we want to log which part of it failed, exactly */
459static int drbd_accept(struct drbd_conf *mdev, const char **what,
460 struct socket *sock, struct socket **newsock)
461{
462 struct sock *sk = sock->sk;
463 int err = 0;
464
465 *what = "listen";
466 err = sock->ops->listen(sock, 5);
467 if (err < 0)
468 goto out;
469
470 *what = "sock_create_lite";
471 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
472 newsock);
473 if (err < 0)
474 goto out;
475
476 *what = "accept";
477 err = sock->ops->accept(sock, *newsock, 0);
478 if (err < 0) {
479 sock_release(*newsock);
480 *newsock = NULL;
481 goto out;
482 }
483 (*newsock)->ops = sock->ops;
484
485out:
486 return err;
487}
488
489static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
490 void *buf, size_t size, int flags)
491{
492 mm_segment_t oldfs;
493 struct kvec iov = {
494 .iov_base = buf,
495 .iov_len = size,
496 };
497 struct msghdr msg = {
498 .msg_iovlen = 1,
499 .msg_iov = (struct iovec *)&iov,
500 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
501 };
502 int rv;
503
504 oldfs = get_fs();
505 set_fs(KERNEL_DS);
506 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
507 set_fs(oldfs);
508
509 return rv;
510}
511
512static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
513{
514 mm_segment_t oldfs;
515 struct kvec iov = {
516 .iov_base = buf,
517 .iov_len = size,
518 };
519 struct msghdr msg = {
520 .msg_iovlen = 1,
521 .msg_iov = (struct iovec *)&iov,
522 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
523 };
524 int rv;
525
526 oldfs = get_fs();
527 set_fs(KERNEL_DS);
528
529 for (;;) {
530 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
531 if (rv == size)
532 break;
533
534 /* Note:
535 * ECONNRESET other side closed the connection
536 * ERESTARTSYS (on sock) we got a signal
537 */
538
539 if (rv < 0) {
540 if (rv == -ECONNRESET)
541 dev_info(DEV, "sock was reset by peer\n");
542 else if (rv != -ERESTARTSYS)
543 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
544 break;
545 } else if (rv == 0) {
546 dev_info(DEV, "sock was shut down by peer\n");
547 break;
548 } else {
549 /* signal came in, or peer/link went down,
550 * after we read a partial message
551 */
552 /* D_ASSERT(signal_pending(current)); */
553 break;
554 }
555 };
556
557 set_fs(oldfs);
558
559 if (rv != size)
560 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
561
562 return rv;
563}
564
565static struct socket *drbd_try_connect(struct drbd_conf *mdev)
566{
567 const char *what;
568 struct socket *sock;
569 struct sockaddr_in6 src_in6;
570 int err;
571 int disconnect_on_error = 1;
572
573 if (!get_net_conf(mdev))
574 return NULL;
575
576 what = "sock_create_kern";
577 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
578 SOCK_STREAM, IPPROTO_TCP, &sock);
579 if (err < 0) {
580 sock = NULL;
581 goto out;
582 }
583
584 sock->sk->sk_rcvtimeo =
585 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
586
587 /* explicitly bind to the configured IP as source IP
588 * for the outgoing connections.
589 * This is needed for multihomed hosts and to be
590 * able to use lo: interfaces for drbd.
591 * Make sure to use 0 as port number, so linux selects
592 * a free one dynamically.
593 */
594 memcpy(&src_in6, mdev->net_conf->my_addr,
595 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
596 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
597 src_in6.sin6_port = 0;
598 else
599 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
600
601 what = "bind before connect";
602 err = sock->ops->bind(sock,
603 (struct sockaddr *) &src_in6,
604 mdev->net_conf->my_addr_len);
605 if (err < 0)
606 goto out;
607
608 /* connect may fail, peer not yet available.
609 * stay C_WF_CONNECTION, don't go Disconnecting! */
610 disconnect_on_error = 0;
611 what = "connect";
612 err = sock->ops->connect(sock,
613 (struct sockaddr *)mdev->net_conf->peer_addr,
614 mdev->net_conf->peer_addr_len, 0);
615
616out:
617 if (err < 0) {
618 if (sock) {
619 sock_release(sock);
620 sock = NULL;
621 }
622 switch (-err) {
623 /* timeout, busy, signal pending */
624 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
625 case EINTR: case ERESTARTSYS:
626 /* peer not (yet) available, network problem */
627 case ECONNREFUSED: case ENETUNREACH:
628 case EHOSTDOWN: case EHOSTUNREACH:
629 disconnect_on_error = 0;
630 break;
631 default:
632 dev_err(DEV, "%s failed, err = %d\n", what, err);
633 }
634 if (disconnect_on_error)
635 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
636 }
637 put_net_conf(mdev);
638 return sock;
639}
640
641static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
642{
643 int timeo, err;
644 struct socket *s_estab = NULL, *s_listen;
645 const char *what;
646
647 if (!get_net_conf(mdev))
648 return NULL;
649
650 what = "sock_create_kern";
651 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
652 SOCK_STREAM, IPPROTO_TCP, &s_listen);
653 if (err) {
654 s_listen = NULL;
655 goto out;
656 }
657
658 timeo = mdev->net_conf->try_connect_int * HZ;
659 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
660
661 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
662 s_listen->sk->sk_rcvtimeo = timeo;
663 s_listen->sk->sk_sndtimeo = timeo;
664
665 what = "bind before listen";
666 err = s_listen->ops->bind(s_listen,
667 (struct sockaddr *) mdev->net_conf->my_addr,
668 mdev->net_conf->my_addr_len);
669 if (err < 0)
670 goto out;
671
672 err = drbd_accept(mdev, &what, s_listen, &s_estab);
673
674out:
675 if (s_listen)
676 sock_release(s_listen);
677 if (err < 0) {
678 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
679 dev_err(DEV, "%s failed, err = %d\n", what, err);
680 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
681 }
682 }
683 put_net_conf(mdev);
684
685 return s_estab;
686}
687
688static int drbd_send_fp(struct drbd_conf *mdev,
689 struct socket *sock, enum drbd_packets cmd)
690{
691 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692
693 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
694}
695
696static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
697{
698 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
699 int rr;
700
701 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
702
703 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
704 return be16_to_cpu(h->command);
705
706 return 0xffff;
707}
708
709/**
710 * drbd_socket_okay() - Free the socket if its connection is not okay
711 * @mdev: DRBD device.
712 * @sock: pointer to the pointer to the socket.
713 */
714static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
715{
716 int rr;
717 char tb[4];
718
719 if (!*sock)
720 return FALSE;
721
722 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
723
724 if (rr > 0 || rr == -EAGAIN) {
725 return TRUE;
726 } else {
727 sock_release(*sock);
728 *sock = NULL;
729 return FALSE;
730 }
731}
732
733/*
734 * return values:
735 * 1 yes, we have a valid connection
736 * 0 oops, did not work out, please try again
737 * -1 peer talks different language,
738 * no point in trying again, please go standalone.
739 * -2 We do not have a network config...
740 */
741static int drbd_connect(struct drbd_conf *mdev)
742{
743 struct socket *s, *sock, *msock;
744 int try, h, ok;
745
746 D_ASSERT(!mdev->data.socket);
747
748 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
749 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
750
751 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
752 return -2;
753
754 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
755
756 sock = NULL;
757 msock = NULL;
758
759 do {
760 for (try = 0;;) {
761 /* 3 tries, this should take less than a second! */
762 s = drbd_try_connect(mdev);
763 if (s || ++try >= 3)
764 break;
765 /* give the other side time to call bind() & listen() */
766 __set_current_state(TASK_INTERRUPTIBLE);
767 schedule_timeout(HZ / 10);
768 }
769
770 if (s) {
771 if (!sock) {
772 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
773 sock = s;
774 s = NULL;
775 } else if (!msock) {
776 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
777 msock = s;
778 s = NULL;
779 } else {
780 dev_err(DEV, "Logic error in drbd_connect()\n");
781 goto out_release_sockets;
782 }
783 }
784
785 if (sock && msock) {
786 __set_current_state(TASK_INTERRUPTIBLE);
787 schedule_timeout(HZ / 10);
788 ok = drbd_socket_okay(mdev, &sock);
789 ok = drbd_socket_okay(mdev, &msock) && ok;
790 if (ok)
791 break;
792 }
793
794retry:
795 s = drbd_wait_for_connect(mdev);
796 if (s) {
797 try = drbd_recv_fp(mdev, s);
798 drbd_socket_okay(mdev, &sock);
799 drbd_socket_okay(mdev, &msock);
800 switch (try) {
801 case P_HAND_SHAKE_S:
802 if (sock) {
803 dev_warn(DEV, "initial packet S crossed\n");
804 sock_release(sock);
805 }
806 sock = s;
807 break;
808 case P_HAND_SHAKE_M:
809 if (msock) {
810 dev_warn(DEV, "initial packet M crossed\n");
811 sock_release(msock);
812 }
813 msock = s;
814 set_bit(DISCARD_CONCURRENT, &mdev->flags);
815 break;
816 default:
817 dev_warn(DEV, "Error receiving initial packet\n");
818 sock_release(s);
819 if (random32() & 1)
820 goto retry;
821 }
822 }
823
824 if (mdev->state.conn <= C_DISCONNECTING)
825 goto out_release_sockets;
826 if (signal_pending(current)) {
827 flush_signals(current);
828 smp_rmb();
829 if (get_t_state(&mdev->receiver) == Exiting)
830 goto out_release_sockets;
831 }
832
833 if (sock && msock) {
834 ok = drbd_socket_okay(mdev, &sock);
835 ok = drbd_socket_okay(mdev, &msock) && ok;
836 if (ok)
837 break;
838 }
839 } while (1);
840
841 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
842 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
843
844 sock->sk->sk_allocation = GFP_NOIO;
845 msock->sk->sk_allocation = GFP_NOIO;
846
847 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
848 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
849
850 if (mdev->net_conf->sndbuf_size) {
851 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
852 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
853 }
854
855 if (mdev->net_conf->rcvbuf_size) {
856 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
857 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
858 }
859
860 /* NOT YET ...
861 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
863 * first set it to the P_HAND_SHAKE timeout,
864 * which we set to 4x the configured ping_timeout. */
865 sock->sk->sk_sndtimeo =
866 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
867
868 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
869 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
870
871 /* we don't want delays.
872 * we use TCP_CORK where apropriate, though */
873 drbd_tcp_nodelay(sock);
874 drbd_tcp_nodelay(msock);
875
876 mdev->data.socket = sock;
877 mdev->meta.socket = msock;
878 mdev->last_received = jiffies;
879
880 D_ASSERT(mdev->asender.task == NULL);
881
882 h = drbd_do_handshake(mdev);
883 if (h <= 0)
884 return h;
885
886 if (mdev->cram_hmac_tfm) {
887 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
888 if (!drbd_do_auth(mdev)) {
889 dev_err(DEV, "Authentication of peer failed\n");
890 return -1;
891 }
892 }
893
894 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
895 return 0;
896
897 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
898 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
899
900 atomic_set(&mdev->packet_seq, 0);
901 mdev->peer_seq = 0;
902
903 drbd_thread_start(&mdev->asender);
904
905 drbd_send_protocol(mdev);
906 drbd_send_sync_param(mdev, &mdev->sync_conf);
907 drbd_send_sizes(mdev, 0);
908 drbd_send_uuids(mdev);
909 drbd_send_state(mdev);
910 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
911 clear_bit(RESIZE_PENDING, &mdev->flags);
912
913 return 1;
914
915out_release_sockets:
916 if (sock)
917 sock_release(sock);
918 if (msock)
919 sock_release(msock);
920 return -1;
921}
922
923static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
924{
925 int r;
926
927 r = drbd_recv(mdev, h, sizeof(*h));
928
929 if (unlikely(r != sizeof(*h))) {
930 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
931 return FALSE;
932 };
933 h->command = be16_to_cpu(h->command);
934 h->length = be16_to_cpu(h->length);
935 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
936 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
937 (long)be32_to_cpu(h->magic),
938 h->command, h->length);
939 return FALSE;
940 }
941 mdev->last_received = jiffies;
942
943 return TRUE;
944}
945
946static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
947{
948 int rv;
949
950 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
951 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL);
952 if (rv) {
953 dev_err(DEV, "local disk flush failed with status %d\n", rv);
954 /* would rather check on EOPNOTSUPP, but that is not reliable.
955 * don't try again for ANY return value != 0
956 * if (rv == -EOPNOTSUPP) */
957 drbd_bump_write_ordering(mdev, WO_drain_io);
958 }
959 put_ldev(mdev);
960 }
961
962 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
963}
964
965static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
966{
967 struct flush_work *fw = (struct flush_work *)w;
968 struct drbd_epoch *epoch = fw->epoch;
969
970 kfree(w);
971
972 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
973 drbd_flush_after_epoch(mdev, epoch);
974
975 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
976 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
977
978 return 1;
979}
980
981/**
982 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
983 * @mdev: DRBD device.
984 * @epoch: Epoch object.
985 * @ev: Epoch event.
986 */
987static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
988 struct drbd_epoch *epoch,
989 enum epoch_event ev)
990{
991 int finish, epoch_size;
992 struct drbd_epoch *next_epoch;
993 int schedule_flush = 0;
994 enum finish_epoch rv = FE_STILL_LIVE;
995
996 spin_lock(&mdev->epoch_lock);
997 do {
998 next_epoch = NULL;
999 finish = 0;
1000
1001 epoch_size = atomic_read(&epoch->epoch_size);
1002
1003 switch (ev & ~EV_CLEANUP) {
1004 case EV_PUT:
1005 atomic_dec(&epoch->active);
1006 break;
1007 case EV_GOT_BARRIER_NR:
1008 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1009
1010 /* Special case: If we just switched from WO_bio_barrier to
1011 WO_bdev_flush we should not finish the current epoch */
1012 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1013 mdev->write_ordering != WO_bio_barrier &&
1014 epoch == mdev->current_epoch)
1015 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1016 break;
1017 case EV_BARRIER_DONE:
1018 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1019 break;
1020 case EV_BECAME_LAST:
1021 /* nothing to do*/
1022 break;
1023 }
1024
1025 trace_drbd_epoch(mdev, epoch, ev);
1026
1027 if (epoch_size != 0 &&
1028 atomic_read(&epoch->active) == 0 &&
1029 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1030 epoch->list.prev == &mdev->current_epoch->list &&
1031 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1032 /* Nearly all conditions are met to finish that epoch... */
1033 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1034 mdev->write_ordering == WO_none ||
1035 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1036 ev & EV_CLEANUP) {
1037 finish = 1;
1038 set_bit(DE_IS_FINISHING, &epoch->flags);
1039 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1040 mdev->write_ordering == WO_bio_barrier) {
1041 atomic_inc(&epoch->active);
1042 schedule_flush = 1;
1043 }
1044 }
1045 if (finish) {
1046 if (!(ev & EV_CLEANUP)) {
1047 spin_unlock(&mdev->epoch_lock);
1048 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1049 spin_lock(&mdev->epoch_lock);
1050 }
1051 dec_unacked(mdev);
1052
1053 if (mdev->current_epoch != epoch) {
1054 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1055 list_del(&epoch->list);
1056 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1057 mdev->epochs--;
1058 trace_drbd_epoch(mdev, epoch, EV_TRACE_FREE);
1059 kfree(epoch);
1060
1061 if (rv == FE_STILL_LIVE)
1062 rv = FE_DESTROYED;
1063 } else {
1064 epoch->flags = 0;
1065 atomic_set(&epoch->epoch_size, 0);
1066 /* atomic_set(&epoch->active, 0); is alrady zero */
1067 if (rv == FE_STILL_LIVE)
1068 rv = FE_RECYCLED;
1069 }
1070 }
1071
1072 if (!next_epoch)
1073 break;
1074
1075 epoch = next_epoch;
1076 } while (1);
1077
1078 spin_unlock(&mdev->epoch_lock);
1079
1080 if (schedule_flush) {
1081 struct flush_work *fw;
1082 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1083 if (fw) {
1084 trace_drbd_epoch(mdev, epoch, EV_TRACE_FLUSH);
1085 fw->w.cb = w_flush;
1086 fw->epoch = epoch;
1087 drbd_queue_work(&mdev->data.work, &fw->w);
1088 } else {
1089 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1090 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1091 /* That is not a recursion, only one level */
1092 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1093 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1094 }
1095 }
1096
1097 return rv;
1098}
1099
1100/**
1101 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1102 * @mdev: DRBD device.
1103 * @wo: Write ordering method to try.
1104 */
1105void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1106{
1107 enum write_ordering_e pwo;
1108 static char *write_ordering_str[] = {
1109 [WO_none] = "none",
1110 [WO_drain_io] = "drain",
1111 [WO_bdev_flush] = "flush",
1112 [WO_bio_barrier] = "barrier",
1113 };
1114
1115 pwo = mdev->write_ordering;
1116 wo = min(pwo, wo);
1117 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1118 wo = WO_bdev_flush;
1119 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1120 wo = WO_drain_io;
1121 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1122 wo = WO_none;
1123 mdev->write_ordering = wo;
1124 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1125 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1126}
1127
1128/**
1129 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1130 * @mdev: DRBD device.
1131 * @w: work object.
1132 * @cancel: The connection will be closed anyways (unused in this callback)
1133 */
1134int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1135{
1136 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1137 struct bio *bio = e->private_bio;
1138
1139 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1140 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1141 so that we can finish that epoch in drbd_may_finish_epoch().
1142 That is necessary if we already have a long chain of Epochs, before
1143 we realize that BIO_RW_BARRIER is actually not supported */
1144
1145 /* As long as the -ENOTSUPP on the barrier is reported immediately
1146 that will never trigger. If it is reported late, we will just
1147 print that warning and continue correctly for all future requests
1148 with WO_bdev_flush */
1149 if (previous_epoch(mdev, e->epoch))
1150 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1151
1152 /* prepare bio for re-submit,
1153 * re-init volatile members */
1154 /* we still have a local reference,
1155 * get_ldev was done in receive_Data. */
1156 bio->bi_bdev = mdev->ldev->backing_bdev;
1157 bio->bi_sector = e->sector;
1158 bio->bi_size = e->size;
1159 bio->bi_idx = 0;
1160
1161 bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1162 bio->bi_flags |= 1 << BIO_UPTODATE;
1163
1164 /* don't know whether this is necessary: */
1165 bio->bi_phys_segments = 0;
1166 bio->bi_next = NULL;
1167
1168 /* these should be unchanged: */
1169 /* bio->bi_end_io = drbd_endio_write_sec; */
1170 /* bio->bi_vcnt = whatever; */
1171
1172 e->w.cb = e_end_block;
1173
1174 /* This is no longer a barrier request. */
1175 bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1176
1177 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1178
1179 return 1;
1180}
1181
1182static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1183{
1184 int rv, issue_flush;
1185 struct p_barrier *p = (struct p_barrier *)h;
1186 struct drbd_epoch *epoch;
1187
1188 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1189
1190 rv = drbd_recv(mdev, h->payload, h->length);
1191 ERR_IF(rv != h->length) return FALSE;
1192
1193 inc_unacked(mdev);
1194
1195 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1196 drbd_kick_lo(mdev);
1197
1198 mdev->current_epoch->barrier_nr = p->barrier;
1199 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1200
1201 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1202 * the activity log, which means it would not be resynced in case the
1203 * R_PRIMARY crashes now.
1204 * Therefore we must send the barrier_ack after the barrier request was
1205 * completed. */
1206 switch (mdev->write_ordering) {
1207 case WO_bio_barrier:
1208 case WO_none:
1209 if (rv == FE_RECYCLED)
1210 return TRUE;
1211 break;
1212
1213 case WO_bdev_flush:
1214 case WO_drain_io:
1215 D_ASSERT(rv == FE_STILL_LIVE);
1216 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1217 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1218 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1219 if (rv == FE_RECYCLED)
1220 return TRUE;
1221
1222 /* The asender will send all the ACKs and barrier ACKs out, since
1223 all EEs moved from the active_ee to the done_ee. We need to
1224 provide a new epoch object for the EEs that come in soon */
1225 break;
1226 }
1227
1228 /* receiver context, in the writeout path of the other node.
1229 * avoid potential distributed deadlock */
1230 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1231 if (!epoch) {
1232 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1233 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1234 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1235 if (issue_flush) {
1236 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1237 if (rv == FE_RECYCLED)
1238 return TRUE;
1239 }
1240
1241 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1242
1243 return TRUE;
1244 }
1245
1246 epoch->flags = 0;
1247 atomic_set(&epoch->epoch_size, 0);
1248 atomic_set(&epoch->active, 0);
1249
1250 spin_lock(&mdev->epoch_lock);
1251 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1252 list_add(&epoch->list, &mdev->current_epoch->list);
1253 mdev->current_epoch = epoch;
1254 mdev->epochs++;
1255 trace_drbd_epoch(mdev, epoch, EV_TRACE_ALLOC);
1256 } else {
1257 /* The current_epoch got recycled while we allocated this one... */
1258 kfree(epoch);
1259 }
1260 spin_unlock(&mdev->epoch_lock);
1261
1262 return TRUE;
1263}
1264
1265/* used from receive_RSDataReply (recv_resync_read)
1266 * and from receive_Data */
1267static struct drbd_epoch_entry *
1268read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1269{
1270 struct drbd_epoch_entry *e;
1271 struct bio_vec *bvec;
1272 struct page *page;
1273 struct bio *bio;
1274 int dgs, ds, i, rr;
1275 void *dig_in = mdev->int_dig_in;
1276 void *dig_vv = mdev->int_dig_vv;
1277
1278 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1279 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1280
1281 if (dgs) {
1282 rr = drbd_recv(mdev, dig_in, dgs);
1283 if (rr != dgs) {
1284 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1285 rr, dgs);
1286 return NULL;
1287 }
1288 }
1289
1290 data_size -= dgs;
1291
1292 ERR_IF(data_size & 0x1ff) return NULL;
1293 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1294
1295 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1296 * "criss-cross" setup, that might cause write-out on some other DRBD,
1297 * which in turn might block on the other node at this very place. */
1298 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1299 if (!e)
1300 return NULL;
1301 bio = e->private_bio;
1302 ds = data_size;
1303 bio_for_each_segment(bvec, bio, i) {
1304 page = bvec->bv_page;
1305 rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1306 kunmap(page);
1307 if (rr != min_t(int, ds, PAGE_SIZE)) {
1308 drbd_free_ee(mdev, e);
1309 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1310 rr, min_t(int, ds, PAGE_SIZE));
1311 return NULL;
1312 }
1313 ds -= rr;
1314 }
1315
1316 if (dgs) {
1317 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1318 if (memcmp(dig_in, dig_vv, dgs)) {
1319 dev_err(DEV, "Digest integrity check FAILED.\n");
1320 drbd_bcast_ee(mdev, "digest failed",
1321 dgs, dig_in, dig_vv, e);
1322 drbd_free_ee(mdev, e);
1323 return NULL;
1324 }
1325 }
1326 mdev->recv_cnt += data_size>>9;
1327 return e;
1328}
1329
1330/* drbd_drain_block() just takes a data block
1331 * out of the socket input buffer, and discards it.
1332 */
1333static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1334{
1335 struct page *page;
1336 int rr, rv = 1;
1337 void *data;
1338
1339 page = drbd_pp_alloc(mdev, 1);
1340
1341 data = kmap(page);
1342 while (data_size) {
1343 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1344 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1345 rv = 0;
1346 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1347 rr, min_t(int, data_size, PAGE_SIZE));
1348 break;
1349 }
1350 data_size -= rr;
1351 }
1352 kunmap(page);
1353 drbd_pp_free(mdev, page);
1354 return rv;
1355}
1356
1357static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1358 sector_t sector, int data_size)
1359{
1360 struct bio_vec *bvec;
1361 struct bio *bio;
1362 int dgs, rr, i, expect;
1363 void *dig_in = mdev->int_dig_in;
1364 void *dig_vv = mdev->int_dig_vv;
1365
1366 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1367 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1368
1369 if (dgs) {
1370 rr = drbd_recv(mdev, dig_in, dgs);
1371 if (rr != dgs) {
1372 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1373 rr, dgs);
1374 return 0;
1375 }
1376 }
1377
1378 data_size -= dgs;
1379
1380 /* optimistically update recv_cnt. if receiving fails below,
1381 * we disconnect anyways, and counters will be reset. */
1382 mdev->recv_cnt += data_size>>9;
1383
1384 bio = req->master_bio;
1385 D_ASSERT(sector == bio->bi_sector);
1386
1387 bio_for_each_segment(bvec, bio, i) {
1388 expect = min_t(int, data_size, bvec->bv_len);
1389 rr = drbd_recv(mdev,
1390 kmap(bvec->bv_page)+bvec->bv_offset,
1391 expect);
1392 kunmap(bvec->bv_page);
1393 if (rr != expect) {
1394 dev_warn(DEV, "short read receiving data reply: "
1395 "read %d expected %d\n",
1396 rr, expect);
1397 return 0;
1398 }
1399 data_size -= rr;
1400 }
1401
1402 if (dgs) {
1403 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1404 if (memcmp(dig_in, dig_vv, dgs)) {
1405 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1406 return 0;
1407 }
1408 }
1409
1410 D_ASSERT(data_size == 0);
1411 return 1;
1412}
1413
1414/* e_end_resync_block() is called via
1415 * drbd_process_done_ee() by asender only */
1416static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1417{
1418 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1419 sector_t sector = e->sector;
1420 int ok;
1421
1422 D_ASSERT(hlist_unhashed(&e->colision));
1423
1424 if (likely(drbd_bio_uptodate(e->private_bio))) {
1425 drbd_set_in_sync(mdev, sector, e->size);
1426 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1427 } else {
1428 /* Record failure to sync */
1429 drbd_rs_failed_io(mdev, sector, e->size);
1430
1431 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1432 }
1433 dec_unacked(mdev);
1434
1435 return ok;
1436}
1437
1438static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1439{
1440 struct drbd_epoch_entry *e;
1441
1442 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1443 if (!e) {
1444 put_ldev(mdev);
1445 return FALSE;
1446 }
1447
1448 dec_rs_pending(mdev);
1449
1450 e->private_bio->bi_end_io = drbd_endio_write_sec;
1451 e->private_bio->bi_rw = WRITE;
1452 e->w.cb = e_end_resync_block;
1453
1454 inc_unacked(mdev);
1455 /* corresponding dec_unacked() in e_end_resync_block()
1456 * respective _drbd_clear_done_ee */
1457
1458 spin_lock_irq(&mdev->req_lock);
1459 list_add(&e->w.list, &mdev->sync_ee);
1460 spin_unlock_irq(&mdev->req_lock);
1461
1462 trace_drbd_ee(mdev, e, "submitting for (rs)write");
1463 trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
1464 drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1465 /* accounting done in endio */
1466
1467 maybe_kick_lo(mdev);
1468 return TRUE;
1469}
1470
1471static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1472{
1473 struct drbd_request *req;
1474 sector_t sector;
1475 unsigned int header_size, data_size;
1476 int ok;
1477 struct p_data *p = (struct p_data *)h;
1478
1479 header_size = sizeof(*p) - sizeof(*h);
1480 data_size = h->length - header_size;
1481
1482 ERR_IF(data_size == 0) return FALSE;
1483
1484 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1485 return FALSE;
1486
1487 sector = be64_to_cpu(p->sector);
1488
1489 spin_lock_irq(&mdev->req_lock);
1490 req = _ar_id_to_req(mdev, p->block_id, sector);
1491 spin_unlock_irq(&mdev->req_lock);
1492 if (unlikely(!req)) {
1493 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1494 return FALSE;
1495 }
1496
1497 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1498 * special casing it there for the various failure cases.
1499 * still no race with drbd_fail_pending_reads */
1500 ok = recv_dless_read(mdev, req, sector, data_size);
1501
1502 if (ok)
1503 req_mod(req, data_received);
1504 /* else: nothing. handled from drbd_disconnect...
1505 * I don't think we may complete this just yet
1506 * in case we are "on-disconnect: freeze" */
1507
1508 return ok;
1509}
1510
1511static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1512{
1513 sector_t sector;
1514 unsigned int header_size, data_size;
1515 int ok;
1516 struct p_data *p = (struct p_data *)h;
1517
1518 header_size = sizeof(*p) - sizeof(*h);
1519 data_size = h->length - header_size;
1520
1521 ERR_IF(data_size == 0) return FALSE;
1522
1523 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1524 return FALSE;
1525
1526 sector = be64_to_cpu(p->sector);
1527 D_ASSERT(p->block_id == ID_SYNCER);
1528
1529 if (get_ldev(mdev)) {
1530 /* data is submitted to disk within recv_resync_read.
1531 * corresponding put_ldev done below on error,
1532 * or in drbd_endio_write_sec. */
1533 ok = recv_resync_read(mdev, sector, data_size);
1534 } else {
1535 if (__ratelimit(&drbd_ratelimit_state))
1536 dev_err(DEV, "Can not write resync data to local disk.\n");
1537
1538 ok = drbd_drain_block(mdev, data_size);
1539
1540 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1541 }
1542
1543 return ok;
1544}
1545
1546/* e_end_block() is called via drbd_process_done_ee().
1547 * this means this function only runs in the asender thread
1548 */
1549static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1550{
1551 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1552 sector_t sector = e->sector;
1553 struct drbd_epoch *epoch;
1554 int ok = 1, pcmd;
1555
1556 if (e->flags & EE_IS_BARRIER) {
1557 epoch = previous_epoch(mdev, e->epoch);
1558 if (epoch)
1559 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1560 }
1561
1562 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1563 if (likely(drbd_bio_uptodate(e->private_bio))) {
1564 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1565 mdev->state.conn <= C_PAUSED_SYNC_T &&
1566 e->flags & EE_MAY_SET_IN_SYNC) ?
1567 P_RS_WRITE_ACK : P_WRITE_ACK;
1568 ok &= drbd_send_ack(mdev, pcmd, e);
1569 if (pcmd == P_RS_WRITE_ACK)
1570 drbd_set_in_sync(mdev, sector, e->size);
1571 } else {
1572 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1573 /* we expect it to be marked out of sync anyways...
1574 * maybe assert this? */
1575 }
1576 dec_unacked(mdev);
1577 }
1578 /* we delete from the conflict detection hash _after_ we sent out the
1579 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1580 if (mdev->net_conf->two_primaries) {
1581 spin_lock_irq(&mdev->req_lock);
1582 D_ASSERT(!hlist_unhashed(&e->colision));
1583 hlist_del_init(&e->colision);
1584 spin_unlock_irq(&mdev->req_lock);
1585 } else {
1586 D_ASSERT(hlist_unhashed(&e->colision));
1587 }
1588
1589 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1590
1591 return ok;
1592}
1593
1594static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1595{
1596 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1597 int ok = 1;
1598
1599 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1600 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1601
1602 spin_lock_irq(&mdev->req_lock);
1603 D_ASSERT(!hlist_unhashed(&e->colision));
1604 hlist_del_init(&e->colision);
1605 spin_unlock_irq(&mdev->req_lock);
1606
1607 dec_unacked(mdev);
1608
1609 return ok;
1610}
1611
1612/* Called from receive_Data.
1613 * Synchronize packets on sock with packets on msock.
1614 *
1615 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1616 * packet traveling on msock, they are still processed in the order they have
1617 * been sent.
1618 *
1619 * Note: we don't care for Ack packets overtaking P_DATA packets.
1620 *
1621 * In case packet_seq is larger than mdev->peer_seq number, there are
1622 * outstanding packets on the msock. We wait for them to arrive.
1623 * In case we are the logically next packet, we update mdev->peer_seq
1624 * ourselves. Correctly handles 32bit wrap around.
1625 *
1626 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1627 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1628 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1629 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1630 *
1631 * returns 0 if we may process the packet,
1632 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1633static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1634{
1635 DEFINE_WAIT(wait);
1636 unsigned int p_seq;
1637 long timeout;
1638 int ret = 0;
1639 spin_lock(&mdev->peer_seq_lock);
1640 for (;;) {
1641 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1642 if (seq_le(packet_seq, mdev->peer_seq+1))
1643 break;
1644 if (signal_pending(current)) {
1645 ret = -ERESTARTSYS;
1646 break;
1647 }
1648 p_seq = mdev->peer_seq;
1649 spin_unlock(&mdev->peer_seq_lock);
1650 timeout = schedule_timeout(30*HZ);
1651 spin_lock(&mdev->peer_seq_lock);
1652 if (timeout == 0 && p_seq == mdev->peer_seq) {
1653 ret = -ETIMEDOUT;
1654 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1655 break;
1656 }
1657 }
1658 finish_wait(&mdev->seq_wait, &wait);
1659 if (mdev->peer_seq+1 == packet_seq)
1660 mdev->peer_seq++;
1661 spin_unlock(&mdev->peer_seq_lock);
1662 return ret;
1663}
1664
1665/* mirrored write */
1666static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1667{
1668 sector_t sector;
1669 struct drbd_epoch_entry *e;
1670 struct p_data *p = (struct p_data *)h;
1671 int header_size, data_size;
1672 int rw = WRITE;
1673 u32 dp_flags;
1674
1675 header_size = sizeof(*p) - sizeof(*h);
1676 data_size = h->length - header_size;
1677
1678 ERR_IF(data_size == 0) return FALSE;
1679
1680 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1681 return FALSE;
1682
1683 if (!get_ldev(mdev)) {
1684 if (__ratelimit(&drbd_ratelimit_state))
1685 dev_err(DEV, "Can not write mirrored data block "
1686 "to local disk.\n");
1687 spin_lock(&mdev->peer_seq_lock);
1688 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1689 mdev->peer_seq++;
1690 spin_unlock(&mdev->peer_seq_lock);
1691
1692 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1693 atomic_inc(&mdev->current_epoch->epoch_size);
1694 return drbd_drain_block(mdev, data_size);
1695 }
1696
1697 /* get_ldev(mdev) successful.
1698 * Corresponding put_ldev done either below (on various errors),
1699 * or in drbd_endio_write_sec, if we successfully submit the data at
1700 * the end of this function. */
1701
1702 sector = be64_to_cpu(p->sector);
1703 e = read_in_block(mdev, p->block_id, sector, data_size);
1704 if (!e) {
1705 put_ldev(mdev);
1706 return FALSE;
1707 }
1708
1709 e->private_bio->bi_end_io = drbd_endio_write_sec;
1710 e->w.cb = e_end_block;
1711
1712 spin_lock(&mdev->epoch_lock);
1713 e->epoch = mdev->current_epoch;
1714 atomic_inc(&e->epoch->epoch_size);
1715 atomic_inc(&e->epoch->active);
1716
1717 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1718 struct drbd_epoch *epoch;
1719 /* Issue a barrier if we start a new epoch, and the previous epoch
1720 was not a epoch containing a single request which already was
1721 a Barrier. */
1722 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1723 if (epoch == e->epoch) {
1724 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1725 trace_drbd_epoch(mdev, e->epoch, EV_TRACE_ADD_BARRIER);
1726 rw |= (1<<BIO_RW_BARRIER);
1727 e->flags |= EE_IS_BARRIER;
1728 } else {
1729 if (atomic_read(&epoch->epoch_size) > 1 ||
1730 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1731 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1732 trace_drbd_epoch(mdev, epoch, EV_TRACE_SETTING_BI);
1733 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1734 trace_drbd_epoch(mdev, e->epoch, EV_TRACE_ADD_BARRIER);
1735 rw |= (1<<BIO_RW_BARRIER);
1736 e->flags |= EE_IS_BARRIER;
1737 }
1738 }
1739 }
1740 spin_unlock(&mdev->epoch_lock);
1741
1742 dp_flags = be32_to_cpu(p->dp_flags);
1743 if (dp_flags & DP_HARDBARRIER) {
1744 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1745 /* rw |= (1<<BIO_RW_BARRIER); */
1746 }
1747 if (dp_flags & DP_RW_SYNC)
1748 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1749 if (dp_flags & DP_MAY_SET_IN_SYNC)
1750 e->flags |= EE_MAY_SET_IN_SYNC;
1751
1752 /* I'm the receiver, I do hold a net_cnt reference. */
1753 if (!mdev->net_conf->two_primaries) {
1754 spin_lock_irq(&mdev->req_lock);
1755 } else {
1756 /* don't get the req_lock yet,
1757 * we may sleep in drbd_wait_peer_seq */
1758 const int size = e->size;
1759 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1760 DEFINE_WAIT(wait);
1761 struct drbd_request *i;
1762 struct hlist_node *n;
1763 struct hlist_head *slot;
1764 int first;
1765
1766 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1767 BUG_ON(mdev->ee_hash == NULL);
1768 BUG_ON(mdev->tl_hash == NULL);
1769
1770 /* conflict detection and handling:
1771 * 1. wait on the sequence number,
1772 * in case this data packet overtook ACK packets.
1773 * 2. check our hash tables for conflicting requests.
1774 * we only need to walk the tl_hash, since an ee can not
1775 * have a conflict with an other ee: on the submitting
1776 * node, the corresponding req had already been conflicting,
1777 * and a conflicting req is never sent.
1778 *
1779 * Note: for two_primaries, we are protocol C,
1780 * so there cannot be any request that is DONE
1781 * but still on the transfer log.
1782 *
1783 * unconditionally add to the ee_hash.
1784 *
1785 * if no conflicting request is found:
1786 * submit.
1787 *
1788 * if any conflicting request is found
1789 * that has not yet been acked,
1790 * AND I have the "discard concurrent writes" flag:
1791 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1792 *
1793 * if any conflicting request is found:
1794 * block the receiver, waiting on misc_wait
1795 * until no more conflicting requests are there,
1796 * or we get interrupted (disconnect).
1797 *
1798 * we do not just write after local io completion of those
1799 * requests, but only after req is done completely, i.e.
1800 * we wait for the P_DISCARD_ACK to arrive!
1801 *
1802 * then proceed normally, i.e. submit.
1803 */
1804 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1805 goto out_interrupted;
1806
1807 spin_lock_irq(&mdev->req_lock);
1808
1809 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1810
1811#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1812 slot = tl_hash_slot(mdev, sector);
1813 first = 1;
1814 for (;;) {
1815 int have_unacked = 0;
1816 int have_conflict = 0;
1817 prepare_to_wait(&mdev->misc_wait, &wait,
1818 TASK_INTERRUPTIBLE);
1819 hlist_for_each_entry(i, n, slot, colision) {
1820 if (OVERLAPS) {
1821 /* only ALERT on first iteration,
1822 * we may be woken up early... */
1823 if (first)
1824 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1825 " new: %llus +%u; pending: %llus +%u\n",
1826 current->comm, current->pid,
1827 (unsigned long long)sector, size,
1828 (unsigned long long)i->sector, i->size);
1829 if (i->rq_state & RQ_NET_PENDING)
1830 ++have_unacked;
1831 ++have_conflict;
1832 }
1833 }
1834#undef OVERLAPS
1835 if (!have_conflict)
1836 break;
1837
1838 /* Discard Ack only for the _first_ iteration */
1839 if (first && discard && have_unacked) {
1840 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1841 (unsigned long long)sector);
1842 inc_unacked(mdev);
1843 e->w.cb = e_send_discard_ack;
1844 list_add_tail(&e->w.list, &mdev->done_ee);
1845
1846 spin_unlock_irq(&mdev->req_lock);
1847
1848 /* we could probably send that P_DISCARD_ACK ourselves,
1849 * but I don't like the receiver using the msock */
1850
1851 put_ldev(mdev);
1852 wake_asender(mdev);
1853 finish_wait(&mdev->misc_wait, &wait);
1854 return TRUE;
1855 }
1856
1857 if (signal_pending(current)) {
1858 hlist_del_init(&e->colision);
1859
1860 spin_unlock_irq(&mdev->req_lock);
1861
1862 finish_wait(&mdev->misc_wait, &wait);
1863 goto out_interrupted;
1864 }
1865
1866 spin_unlock_irq(&mdev->req_lock);
1867 if (first) {
1868 first = 0;
1869 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1870 "sec=%llus\n", (unsigned long long)sector);
1871 } else if (discard) {
1872 /* we had none on the first iteration.
1873 * there must be none now. */
1874 D_ASSERT(have_unacked == 0);
1875 }
1876 schedule();
1877 spin_lock_irq(&mdev->req_lock);
1878 }
1879 finish_wait(&mdev->misc_wait, &wait);
1880 }
1881
1882 list_add(&e->w.list, &mdev->active_ee);
1883 spin_unlock_irq(&mdev->req_lock);
1884
1885 switch (mdev->net_conf->wire_protocol) {
1886 case DRBD_PROT_C:
1887 inc_unacked(mdev);
1888 /* corresponding dec_unacked() in e_end_block()
1889 * respective _drbd_clear_done_ee */
1890 break;
1891 case DRBD_PROT_B:
1892 /* I really don't like it that the receiver thread
1893 * sends on the msock, but anyways */
1894 drbd_send_ack(mdev, P_RECV_ACK, e);
1895 break;
1896 case DRBD_PROT_A:
1897 /* nothing to do */
1898 break;
1899 }
1900
1901 if (mdev->state.pdsk == D_DISKLESS) {
1902 /* In case we have the only disk of the cluster, */
1903 drbd_set_out_of_sync(mdev, e->sector, e->size);
1904 e->flags |= EE_CALL_AL_COMPLETE_IO;
1905 drbd_al_begin_io(mdev, e->sector);
1906 }
1907
1908 e->private_bio->bi_rw = rw;
1909 trace_drbd_ee(mdev, e, "submitting for (data)write");
1910 trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
1911 drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1912 /* accounting done in endio */
1913
1914 maybe_kick_lo(mdev);
1915 return TRUE;
1916
1917out_interrupted:
1918 /* yes, the epoch_size now is imbalanced.
1919 * but we drop the connection anyways, so we don't have a chance to
1920 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1921 put_ldev(mdev);
1922 drbd_free_ee(mdev, e);
1923 return FALSE;
1924}
1925
1926static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1927{
1928 sector_t sector;
1929 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1930 struct drbd_epoch_entry *e;
1931 struct digest_info *di = NULL;
1932 int size, digest_size;
1933 unsigned int fault_type;
1934 struct p_block_req *p =
1935 (struct p_block_req *)h;
1936 const int brps = sizeof(*p)-sizeof(*h);
1937
1938 if (drbd_recv(mdev, h->payload, brps) != brps)
1939 return FALSE;
1940
1941 sector = be64_to_cpu(p->sector);
1942 size = be32_to_cpu(p->blksize);
1943
1944 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1945 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1946 (unsigned long long)sector, size);
1947 return FALSE;
1948 }
1949 if (sector + (size>>9) > capacity) {
1950 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1951 (unsigned long long)sector, size);
1952 return FALSE;
1953 }
1954
1955 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1956 if (__ratelimit(&drbd_ratelimit_state))
1957 dev_err(DEV, "Can not satisfy peer's read request, "
1958 "no local data.\n");
1959 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1960 P_NEG_RS_DREPLY , p);
1961 return TRUE;
1962 }
1963
1964 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1965 * "criss-cross" setup, that might cause write-out on some other DRBD,
1966 * which in turn might block on the other node at this very place. */
1967 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1968 if (!e) {
1969 put_ldev(mdev);
1970 return FALSE;
1971 }
1972
1973 e->private_bio->bi_rw = READ;
1974 e->private_bio->bi_end_io = drbd_endio_read_sec;
1975
1976 switch (h->command) {
1977 case P_DATA_REQUEST:
1978 e->w.cb = w_e_end_data_req;
1979 fault_type = DRBD_FAULT_DT_RD;
1980 break;
1981 case P_RS_DATA_REQUEST:
1982 e->w.cb = w_e_end_rsdata_req;
1983 fault_type = DRBD_FAULT_RS_RD;
1984 /* Eventually this should become asynchronously. Currently it
1985 * blocks the whole receiver just to delay the reading of a
1986 * resync data block.
1987 * the drbd_work_queue mechanism is made for this...
1988 */
1989 if (!drbd_rs_begin_io(mdev, sector)) {
1990 /* we have been interrupted,
1991 * probably connection lost! */
1992 D_ASSERT(signal_pending(current));
1993 goto out_free_e;
1994 }
1995 break;
1996
1997 case P_OV_REPLY:
1998 case P_CSUM_RS_REQUEST:
1999 fault_type = DRBD_FAULT_RS_RD;
2000 digest_size = h->length - brps ;
2001 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2002 if (!di)
2003 goto out_free_e;
2004
2005 di->digest_size = digest_size;
2006 di->digest = (((char *)di)+sizeof(struct digest_info));
2007
2008 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2009 goto out_free_e;
2010
2011 e->block_id = (u64)(unsigned long)di;
2012 if (h->command == P_CSUM_RS_REQUEST) {
2013 D_ASSERT(mdev->agreed_pro_version >= 89);
2014 e->w.cb = w_e_end_csum_rs_req;
2015 } else if (h->command == P_OV_REPLY) {
2016 e->w.cb = w_e_end_ov_reply;
2017 dec_rs_pending(mdev);
2018 break;
2019 }
2020
2021 if (!drbd_rs_begin_io(mdev, sector)) {
2022 /* we have been interrupted, probably connection lost! */
2023 D_ASSERT(signal_pending(current));
2024 goto out_free_e;
2025 }
2026 break;
2027
2028 case P_OV_REQUEST:
2029 if (mdev->state.conn >= C_CONNECTED &&
2030 mdev->state.conn != C_VERIFY_T)
2031 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2032 drbd_conn_str(mdev->state.conn));
2033 if (mdev->ov_start_sector == ~(sector_t)0 &&
2034 mdev->agreed_pro_version >= 90) {
2035 mdev->ov_start_sector = sector;
2036 mdev->ov_position = sector;
2037 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2038 dev_info(DEV, "Online Verify start sector: %llu\n",
2039 (unsigned long long)sector);
2040 }
2041 e->w.cb = w_e_end_ov_req;
2042 fault_type = DRBD_FAULT_RS_RD;
2043 /* Eventually this should become asynchronous. Currently it
2044 * blocks the whole receiver just to delay the reading of a
2045 * resync data block.
2046 * the drbd_work_queue mechanism is made for this...
2047 */
2048 if (!drbd_rs_begin_io(mdev, sector)) {
2049 /* we have been interrupted,
2050 * probably connection lost! */
2051 D_ASSERT(signal_pending(current));
2052 goto out_free_e;
2053 }
2054 break;
2055
2056
2057 default:
2058 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2059 cmdname(h->command));
2060 fault_type = DRBD_FAULT_MAX;
2061 }
2062
2063 spin_lock_irq(&mdev->req_lock);
2064 list_add(&e->w.list, &mdev->read_ee);
2065 spin_unlock_irq(&mdev->req_lock);
2066
2067 inc_unacked(mdev);
2068
2069 trace_drbd_ee(mdev, e, "submitting for read");
2070 trace_drbd_bio(mdev, "Sec", e->private_bio, 0, NULL);
2071 drbd_generic_make_request(mdev, fault_type, e->private_bio);
2072 maybe_kick_lo(mdev);
2073
2074 return TRUE;
2075
2076out_free_e:
2077 kfree(di);
2078 put_ldev(mdev);
2079 drbd_free_ee(mdev, e);
2080 return FALSE;
2081}
2082
2083static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2084{
2085 int self, peer, rv = -100;
2086 unsigned long ch_self, ch_peer;
2087
2088 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2089 peer = mdev->p_uuid[UI_BITMAP] & 1;
2090
2091 ch_peer = mdev->p_uuid[UI_SIZE];
2092 ch_self = mdev->comm_bm_set;
2093
2094 switch (mdev->net_conf->after_sb_0p) {
2095 case ASB_CONSENSUS:
2096 case ASB_DISCARD_SECONDARY:
2097 case ASB_CALL_HELPER:
2098 dev_err(DEV, "Configuration error.\n");
2099 break;
2100 case ASB_DISCONNECT:
2101 break;
2102 case ASB_DISCARD_YOUNGER_PRI:
2103 if (self == 0 && peer == 1) {
2104 rv = -1;
2105 break;
2106 }
2107 if (self == 1 && peer == 0) {
2108 rv = 1;
2109 break;
2110 }
2111 /* Else fall through to one of the other strategies... */
2112 case ASB_DISCARD_OLDER_PRI:
2113 if (self == 0 && peer == 1) {
2114 rv = 1;
2115 break;
2116 }
2117 if (self == 1 && peer == 0) {
2118 rv = -1;
2119 break;
2120 }
2121 /* Else fall through to one of the other strategies... */
2122 dev_warn(DEV, "Discard younger/older primary did not found a decision\n"
2123 "Using discard-least-changes instead\n");
2124 case ASB_DISCARD_ZERO_CHG:
2125 if (ch_peer == 0 && ch_self == 0) {
2126 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2127 ? -1 : 1;
2128 break;
2129 } else {
2130 if (ch_peer == 0) { rv = 1; break; }
2131 if (ch_self == 0) { rv = -1; break; }
2132 }
2133 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2134 break;
2135 case ASB_DISCARD_LEAST_CHG:
2136 if (ch_self < ch_peer)
2137 rv = -1;
2138 else if (ch_self > ch_peer)
2139 rv = 1;
2140 else /* ( ch_self == ch_peer ) */
2141 /* Well, then use something else. */
2142 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2143 ? -1 : 1;
2144 break;
2145 case ASB_DISCARD_LOCAL:
2146 rv = -1;
2147 break;
2148 case ASB_DISCARD_REMOTE:
2149 rv = 1;
2150 }
2151
2152 return rv;
2153}
2154
2155static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2156{
2157 int self, peer, hg, rv = -100;
2158
2159 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2160 peer = mdev->p_uuid[UI_BITMAP] & 1;
2161
2162 switch (mdev->net_conf->after_sb_1p) {
2163 case ASB_DISCARD_YOUNGER_PRI:
2164 case ASB_DISCARD_OLDER_PRI:
2165 case ASB_DISCARD_LEAST_CHG:
2166 case ASB_DISCARD_LOCAL:
2167 case ASB_DISCARD_REMOTE:
2168 dev_err(DEV, "Configuration error.\n");
2169 break;
2170 case ASB_DISCONNECT:
2171 break;
2172 case ASB_CONSENSUS:
2173 hg = drbd_asb_recover_0p(mdev);
2174 if (hg == -1 && mdev->state.role == R_SECONDARY)
2175 rv = hg;
2176 if (hg == 1 && mdev->state.role == R_PRIMARY)
2177 rv = hg;
2178 break;
2179 case ASB_VIOLENTLY:
2180 rv = drbd_asb_recover_0p(mdev);
2181 break;
2182 case ASB_DISCARD_SECONDARY:
2183 return mdev->state.role == R_PRIMARY ? 1 : -1;
2184 case ASB_CALL_HELPER:
2185 hg = drbd_asb_recover_0p(mdev);
2186 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2187 self = drbd_set_role(mdev, R_SECONDARY, 0);
2188 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2189 * we might be here in C_WF_REPORT_PARAMS which is transient.
2190 * we do not need to wait for the after state change work either. */
2191 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2192 if (self != SS_SUCCESS) {
2193 drbd_khelper(mdev, "pri-lost-after-sb");
2194 } else {
2195 dev_warn(DEV, "Successfully gave up primary role.\n");
2196 rv = hg;
2197 }
2198 } else
2199 rv = hg;
2200 }
2201
2202 return rv;
2203}
2204
2205static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2206{
2207 int self, peer, hg, rv = -100;
2208
2209 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2210 peer = mdev->p_uuid[UI_BITMAP] & 1;
2211
2212 switch (mdev->net_conf->after_sb_2p) {
2213 case ASB_DISCARD_YOUNGER_PRI:
2214 case ASB_DISCARD_OLDER_PRI:
2215 case ASB_DISCARD_LEAST_CHG:
2216 case ASB_DISCARD_LOCAL:
2217 case ASB_DISCARD_REMOTE:
2218 case ASB_CONSENSUS:
2219 case ASB_DISCARD_SECONDARY:
2220 dev_err(DEV, "Configuration error.\n");
2221 break;
2222 case ASB_VIOLENTLY:
2223 rv = drbd_asb_recover_0p(mdev);
2224 break;
2225 case ASB_DISCONNECT:
2226 break;
2227 case ASB_CALL_HELPER:
2228 hg = drbd_asb_recover_0p(mdev);
2229 if (hg == -1) {
2230 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2231 * we might be here in C_WF_REPORT_PARAMS which is transient.
2232 * we do not need to wait for the after state change work either. */
2233 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2234 if (self != SS_SUCCESS) {
2235 drbd_khelper(mdev, "pri-lost-after-sb");
2236 } else {
2237 dev_warn(DEV, "Successfully gave up primary role.\n");
2238 rv = hg;
2239 }
2240 } else
2241 rv = hg;
2242 }
2243
2244 return rv;
2245}
2246
2247static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2248 u64 bits, u64 flags)
2249{
2250 if (!uuid) {
2251 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2252 return;
2253 }
2254 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2255 text,
2256 (unsigned long long)uuid[UI_CURRENT],
2257 (unsigned long long)uuid[UI_BITMAP],
2258 (unsigned long long)uuid[UI_HISTORY_START],
2259 (unsigned long long)uuid[UI_HISTORY_END],
2260 (unsigned long long)bits,
2261 (unsigned long long)flags);
2262}
2263
2264/*
2265 100 after split brain try auto recover
2266 2 C_SYNC_SOURCE set BitMap
2267 1 C_SYNC_SOURCE use BitMap
2268 0 no Sync
2269 -1 C_SYNC_TARGET use BitMap
2270 -2 C_SYNC_TARGET set BitMap
2271 -100 after split brain, disconnect
2272-1000 unrelated data
2273 */
2274static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2275{
2276 u64 self, peer;
2277 int i, j;
2278
2279 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2280 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2281
2282 *rule_nr = 10;
2283 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2284 return 0;
2285
2286 *rule_nr = 20;
2287 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2288 peer != UUID_JUST_CREATED)
2289 return -2;
2290
2291 *rule_nr = 30;
2292 if (self != UUID_JUST_CREATED &&
2293 (peer == UUID_JUST_CREATED || peer == (u64)0))
2294 return 2;
2295
2296 if (self == peer) {
2297 int rct, dc; /* roles at crash time */
2298
2299 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2300
2301 if (mdev->agreed_pro_version < 91)
2302 return -1001;
2303
2304 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2305 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2306 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2307 drbd_uuid_set_bm(mdev, 0UL);
2308
2309 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2310 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2311 *rule_nr = 34;
2312 } else {
2313 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2314 *rule_nr = 36;
2315 }
2316
2317 return 1;
2318 }
2319
2320 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2321
2322 if (mdev->agreed_pro_version < 91)
2323 return -1001;
2324
2325 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2326 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2327 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2328
2329 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2330 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2331 mdev->p_uuid[UI_BITMAP] = 0UL;
2332
2333 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2334 *rule_nr = 35;
2335 } else {
2336 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2337 *rule_nr = 37;
2338 }
2339
2340 return -1;
2341 }
2342
2343 /* Common power [off|failure] */
2344 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2345 (mdev->p_uuid[UI_FLAGS] & 2);
2346 /* lowest bit is set when we were primary,
2347 * next bit (weight 2) is set when peer was primary */
2348 *rule_nr = 40;
2349
2350 switch (rct) {
2351 case 0: /* !self_pri && !peer_pri */ return 0;
2352 case 1: /* self_pri && !peer_pri */ return 1;
2353 case 2: /* !self_pri && peer_pri */ return -1;
2354 case 3: /* self_pri && peer_pri */
2355 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2356 return dc ? -1 : 1;
2357 }
2358 }
2359
2360 *rule_nr = 50;
2361 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2362 if (self == peer)
2363 return -1;
2364
2365 *rule_nr = 51;
2366 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2367 if (self == peer) {
2368 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2369 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2370 if (self == peer) {
2371 /* The last P_SYNC_UUID did not get though. Undo the last start of
2372 resync as sync source modifications of the peer's UUIDs. */
2373
2374 if (mdev->agreed_pro_version < 91)
2375 return -1001;
2376
2377 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2378 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2379 return -1;
2380 }
2381 }
2382
2383 *rule_nr = 60;
2384 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2385 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2386 peer = mdev->p_uuid[i] & ~((u64)1);
2387 if (self == peer)
2388 return -2;
2389 }
2390
2391 *rule_nr = 70;
2392 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2393 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2394 if (self == peer)
2395 return 1;
2396
2397 *rule_nr = 71;
2398 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2399 if (self == peer) {
2400 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2401 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2402 if (self == peer) {
2403 /* The last P_SYNC_UUID did not get though. Undo the last start of
2404 resync as sync source modifications of our UUIDs. */
2405
2406 if (mdev->agreed_pro_version < 91)
2407 return -1001;
2408
2409 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2410 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2411
2412 dev_info(DEV, "Undid last start of resync:\n");
2413
2414 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2415 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2416
2417 return 1;
2418 }
2419 }
2420
2421
2422 *rule_nr = 80;
2423 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2424 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2425 if (self == peer)
2426 return 2;
2427 }
2428
2429 *rule_nr = 90;
2430 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2431 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2432 if (self == peer && self != ((u64)0))
2433 return 100;
2434
2435 *rule_nr = 100;
2436 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2437 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2438 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2439 peer = mdev->p_uuid[j] & ~((u64)1);
2440 if (self == peer)
2441 return -100;
2442 }
2443 }
2444
2445 return -1000;
2446}
2447
2448/* drbd_sync_handshake() returns the new conn state on success, or
2449 CONN_MASK (-1) on failure.
2450 */
2451static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2452 enum drbd_disk_state peer_disk) __must_hold(local)
2453{
2454 int hg, rule_nr;
2455 enum drbd_conns rv = C_MASK;
2456 enum drbd_disk_state mydisk;
2457
2458 mydisk = mdev->state.disk;
2459 if (mydisk == D_NEGOTIATING)
2460 mydisk = mdev->new_state_tmp.disk;
2461
2462 dev_info(DEV, "drbd_sync_handshake:\n");
2463 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2464 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2465 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2466
2467 hg = drbd_uuid_compare(mdev, &rule_nr);
2468
2469 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2470
2471 if (hg == -1000) {
2472 dev_alert(DEV, "Unrelated data, aborting!\n");
2473 return C_MASK;
2474 }
2475 if (hg == -1001) {
2476 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2477 return C_MASK;
2478 }
2479
2480 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2481 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2482 int f = (hg == -100) || abs(hg) == 2;
2483 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2484 if (f)
2485 hg = hg*2;
2486 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2487 hg > 0 ? "source" : "target");
2488 }
2489
2490 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2491 int pcount = (mdev->state.role == R_PRIMARY)
2492 + (peer_role == R_PRIMARY);
2493 int forced = (hg == -100);
2494
2495 switch (pcount) {
2496 case 0:
2497 hg = drbd_asb_recover_0p(mdev);
2498 break;
2499 case 1:
2500 hg = drbd_asb_recover_1p(mdev);
2501 break;
2502 case 2:
2503 hg = drbd_asb_recover_2p(mdev);
2504 break;
2505 }
2506 if (abs(hg) < 100) {
2507 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2508 "automatically solved. Sync from %s node\n",
2509 pcount, (hg < 0) ? "peer" : "this");
2510 if (forced) {
2511 dev_warn(DEV, "Doing a full sync, since"
2512 " UUIDs where ambiguous.\n");
2513 hg = hg*2;
2514 }
2515 }
2516 }
2517
2518 if (hg == -100) {
2519 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2520 hg = -1;
2521 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2522 hg = 1;
2523
2524 if (abs(hg) < 100)
2525 dev_warn(DEV, "Split-Brain detected, manually solved. "
2526 "Sync from %s node\n",
2527 (hg < 0) ? "peer" : "this");
2528 }
2529
2530 if (hg == -100) {
2531 dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2532 drbd_khelper(mdev, "split-brain");
2533 return C_MASK;
2534 }
2535
2536 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2537 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2538 return C_MASK;
2539 }
2540
2541 if (hg < 0 && /* by intention we do not use mydisk here. */
2542 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2543 switch (mdev->net_conf->rr_conflict) {
2544 case ASB_CALL_HELPER:
2545 drbd_khelper(mdev, "pri-lost");
2546 /* fall through */
2547 case ASB_DISCONNECT:
2548 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2549 return C_MASK;
2550 case ASB_VIOLENTLY:
2551 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2552 "assumption\n");
2553 }
2554 }
2555
2556 if (abs(hg) >= 2) {
2557 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2558 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2559 return C_MASK;
2560 }
2561
2562 if (hg > 0) { /* become sync source. */
2563 rv = C_WF_BITMAP_S;
2564 } else if (hg < 0) { /* become sync target */
2565 rv = C_WF_BITMAP_T;
2566 } else {
2567 rv = C_CONNECTED;
2568 if (drbd_bm_total_weight(mdev)) {
2569 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2570 drbd_bm_total_weight(mdev));
2571 }
2572 }
2573
2574 return rv;
2575}
2576
2577/* returns 1 if invalid */
2578static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2579{
2580 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2581 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2582 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2583 return 0;
2584
2585 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2586 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2587 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2588 return 1;
2589
2590 /* everything else is valid if they are equal on both sides. */
2591 if (peer == self)
2592 return 0;
2593
2594 /* everything es is invalid. */
2595 return 1;
2596}
2597
2598static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2599{
2600 struct p_protocol *p = (struct p_protocol *)h;
2601 int header_size, data_size;
2602 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2603 int p_want_lose, p_two_primaries;
2604 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2605
2606 header_size = sizeof(*p) - sizeof(*h);
2607 data_size = h->length - header_size;
2608
2609 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2610 return FALSE;
2611
2612 p_proto = be32_to_cpu(p->protocol);
2613 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2614 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2615 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
2616 p_want_lose = be32_to_cpu(p->want_lose);
2617 p_two_primaries = be32_to_cpu(p->two_primaries);
2618
2619 if (p_proto != mdev->net_conf->wire_protocol) {
2620 dev_err(DEV, "incompatible communication protocols\n");
2621 goto disconnect;
2622 }
2623
2624 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2625 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2626 goto disconnect;
2627 }
2628
2629 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2630 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2631 goto disconnect;
2632 }
2633
2634 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2635 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2636 goto disconnect;
2637 }
2638
2639 if (p_want_lose && mdev->net_conf->want_lose) {
2640 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2641 goto disconnect;
2642 }
2643
2644 if (p_two_primaries != mdev->net_conf->two_primaries) {
2645 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2646 goto disconnect;
2647 }
2648
2649 if (mdev->agreed_pro_version >= 87) {
2650 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2651
2652 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2653 return FALSE;
2654
2655 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2656 if (strcmp(p_integrity_alg, my_alg)) {
2657 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2658 goto disconnect;
2659 }
2660 dev_info(DEV, "data-integrity-alg: %s\n",
2661 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2662 }
2663
2664 return TRUE;
2665
2666disconnect:
2667 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2668 return FALSE;
2669}
2670
2671/* helper function
2672 * input: alg name, feature name
2673 * return: NULL (alg name was "")
2674 * ERR_PTR(error) if something goes wrong
2675 * or the crypto hash ptr, if it worked out ok. */
2676struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2677 const char *alg, const char *name)
2678{
2679 struct crypto_hash *tfm;
2680
2681 if (!alg[0])
2682 return NULL;
2683
2684 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2685 if (IS_ERR(tfm)) {
2686 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2687 alg, name, PTR_ERR(tfm));
2688 return tfm;
2689 }
2690 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2691 crypto_free_hash(tfm);
2692 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2693 return ERR_PTR(-EINVAL);
2694 }
2695 return tfm;
2696}
2697
2698static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2699{
2700 int ok = TRUE;
2701 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2702 unsigned int header_size, data_size, exp_max_sz;
2703 struct crypto_hash *verify_tfm = NULL;
2704 struct crypto_hash *csums_tfm = NULL;
2705 const int apv = mdev->agreed_pro_version;
2706
2707 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2708 : apv == 88 ? sizeof(struct p_rs_param)
2709 + SHARED_SECRET_MAX
2710 : /* 89 */ sizeof(struct p_rs_param_89);
2711
2712 if (h->length > exp_max_sz) {
2713 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2714 h->length, exp_max_sz);
2715 return FALSE;
2716 }
2717
2718 if (apv <= 88) {
2719 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2720 data_size = h->length - header_size;
2721 } else /* apv >= 89 */ {
2722 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2723 data_size = h->length - header_size;
2724 D_ASSERT(data_size == 0);
2725 }
2726
2727 /* initialize verify_alg and csums_alg */
2728 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2729
2730 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2731 return FALSE;
2732
2733 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2734
2735 if (apv >= 88) {
2736 if (apv == 88) {
2737 if (data_size > SHARED_SECRET_MAX) {
2738 dev_err(DEV, "verify-alg too long, "
2739 "peer wants %u, accepting only %u byte\n",
2740 data_size, SHARED_SECRET_MAX);
2741 return FALSE;
2742 }
2743
2744 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2745 return FALSE;
2746
2747 /* we expect NUL terminated string */
2748 /* but just in case someone tries to be evil */
2749 D_ASSERT(p->verify_alg[data_size-1] == 0);
2750 p->verify_alg[data_size-1] = 0;
2751
2752 } else /* apv >= 89 */ {
2753 /* we still expect NUL terminated strings */
2754 /* but just in case someone tries to be evil */
2755 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2756 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2757 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2758 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2759 }
2760
2761 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2762 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2763 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2764 mdev->sync_conf.verify_alg, p->verify_alg);
2765 goto disconnect;
2766 }
2767 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2768 p->verify_alg, "verify-alg");
2769 if (IS_ERR(verify_tfm)) {
2770 verify_tfm = NULL;
2771 goto disconnect;
2772 }
2773 }
2774
2775 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2776 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2777 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2778 mdev->sync_conf.csums_alg, p->csums_alg);
2779 goto disconnect;
2780 }
2781 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2782 p->csums_alg, "csums-alg");
2783 if (IS_ERR(csums_tfm)) {
2784 csums_tfm = NULL;
2785 goto disconnect;
2786 }
2787 }
2788
2789
2790 spin_lock(&mdev->peer_seq_lock);
2791 /* lock against drbd_nl_syncer_conf() */
2792 if (verify_tfm) {
2793 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2794 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2795 crypto_free_hash(mdev->verify_tfm);
2796 mdev->verify_tfm = verify_tfm;
2797 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2798 }
2799 if (csums_tfm) {
2800 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2801 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2802 crypto_free_hash(mdev->csums_tfm);
2803 mdev->csums_tfm = csums_tfm;
2804 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2805 }
2806 spin_unlock(&mdev->peer_seq_lock);
2807 }
2808
2809 return ok;
2810disconnect:
2811 /* just for completeness: actually not needed,
2812 * as this is not reached if csums_tfm was ok. */
2813 crypto_free_hash(csums_tfm);
2814 /* but free the verify_tfm again, if csums_tfm did not work out */
2815 crypto_free_hash(verify_tfm);
2816 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2817 return FALSE;
2818}
2819
2820static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2821{
2822 /* sorry, we currently have no working implementation
2823 * of distributed TCQ */
2824}
2825
2826/* warn if the arguments differ by more than 12.5% */
2827static void warn_if_differ_considerably(struct drbd_conf *mdev,
2828 const char *s, sector_t a, sector_t b)
2829{
2830 sector_t d;
2831 if (a == 0 || b == 0)
2832 return;
2833 d = (a > b) ? (a - b) : (b - a);
2834 if (d > (a>>3) || d > (b>>3))
2835 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2836 (unsigned long long)a, (unsigned long long)b);
2837}
2838
2839static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2840{
2841 struct p_sizes *p = (struct p_sizes *)h;
2842 enum determine_dev_size dd = unchanged;
2843 unsigned int max_seg_s;
2844 sector_t p_size, p_usize, my_usize;
2845 int ldsc = 0; /* local disk size changed */
2846 enum drbd_conns nconn;
2847
2848 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2849 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2850 return FALSE;
2851
2852 p_size = be64_to_cpu(p->d_size);
2853 p_usize = be64_to_cpu(p->u_size);
2854
2855 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2856 dev_err(DEV, "some backing storage is needed\n");
2857 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2858 return FALSE;
2859 }
2860
2861 /* just store the peer's disk size for now.
2862 * we still need to figure out whether we accept that. */
2863 mdev->p_size = p_size;
2864
2865#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2866 if (get_ldev(mdev)) {
2867 warn_if_differ_considerably(mdev, "lower level device sizes",
2868 p_size, drbd_get_max_capacity(mdev->ldev));
2869 warn_if_differ_considerably(mdev, "user requested size",
2870 p_usize, mdev->ldev->dc.disk_size);
2871
2872 /* if this is the first connect, or an otherwise expected
2873 * param exchange, choose the minimum */
2874 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2875 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2876 p_usize);
2877
2878 my_usize = mdev->ldev->dc.disk_size;
2879
2880 if (mdev->ldev->dc.disk_size != p_usize) {
2881 mdev->ldev->dc.disk_size = p_usize;
2882 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2883 (unsigned long)mdev->ldev->dc.disk_size);
2884 }
2885
2886 /* Never shrink a device with usable data during connect.
2887 But allow online shrinking if we are connected. */
2888 if (drbd_new_dev_size(mdev, mdev->ldev) <
2889 drbd_get_capacity(mdev->this_bdev) &&
2890 mdev->state.disk >= D_OUTDATED &&
2891 mdev->state.conn < C_CONNECTED) {
2892 dev_err(DEV, "The peer's disk size is too small!\n");
2893 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2894 mdev->ldev->dc.disk_size = my_usize;
2895 put_ldev(mdev);
2896 return FALSE;
2897 }
2898 put_ldev(mdev);
2899 }
2900#undef min_not_zero
2901
2902 if (get_ldev(mdev)) {
2903 dd = drbd_determin_dev_size(mdev);
2904 put_ldev(mdev);
2905 if (dd == dev_size_error)
2906 return FALSE;
2907 drbd_md_sync(mdev);
2908 } else {
2909 /* I am diskless, need to accept the peer's size. */
2910 drbd_set_my_capacity(mdev, p_size);
2911 }
2912
2913 if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2914 nconn = drbd_sync_handshake(mdev,
2915 mdev->state.peer, mdev->state.pdsk);
2916 put_ldev(mdev);
2917
2918 if (nconn == C_MASK) {
2919 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2920 return FALSE;
2921 }
2922
2923 if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2924 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2925 return FALSE;
2926 }
2927 }
2928
2929 if (get_ldev(mdev)) {
2930 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2931 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2932 ldsc = 1;
2933 }
2934
2935 max_seg_s = be32_to_cpu(p->max_segment_size);
2936 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2937 drbd_setup_queue_param(mdev, max_seg_s);
2938
2939 drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2940 put_ldev(mdev);
2941 }
2942
2943 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2944 if (be64_to_cpu(p->c_size) !=
2945 drbd_get_capacity(mdev->this_bdev) || ldsc) {
2946 /* we have different sizes, probably peer
2947 * needs to know my new size... */
2948 drbd_send_sizes(mdev, 0);
2949 }
2950 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2951 (dd == grew && mdev->state.conn == C_CONNECTED)) {
2952 if (mdev->state.pdsk >= D_INCONSISTENT &&
2953 mdev->state.disk >= D_INCONSISTENT)
2954 resync_after_online_grow(mdev);
2955 else
2956 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2957 }
2958 }
2959
2960 return TRUE;
2961}
2962
2963static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2964{
2965 struct p_uuids *p = (struct p_uuids *)h;
2966 u64 *p_uuid;
2967 int i;
2968
2969 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2970 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2971 return FALSE;
2972
2973 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2974
2975 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2976 p_uuid[i] = be64_to_cpu(p->uuid[i]);
2977
2978 kfree(mdev->p_uuid);
2979 mdev->p_uuid = p_uuid;
2980
2981 if (mdev->state.conn < C_CONNECTED &&
2982 mdev->state.disk < D_INCONSISTENT &&
2983 mdev->state.role == R_PRIMARY &&
2984 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2985 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2986 (unsigned long long)mdev->ed_uuid);
2987 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2988 return FALSE;
2989 }
2990
2991 if (get_ldev(mdev)) {
2992 int skip_initial_sync =
2993 mdev->state.conn == C_CONNECTED &&
2994 mdev->agreed_pro_version >= 90 &&
2995 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
2996 (p_uuid[UI_FLAGS] & 8);
2997 if (skip_initial_sync) {
2998 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
2999 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3000 "clear_n_write from receive_uuids");
3001 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3002 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3003 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3004 CS_VERBOSE, NULL);
3005 drbd_md_sync(mdev);
3006 }
3007 put_ldev(mdev);
3008 }
3009
3010 /* Before we test for the disk state, we should wait until an eventually
3011 ongoing cluster wide state change is finished. That is important if
3012 we are primary and are detaching from our disk. We need to see the
3013 new disk state... */
3014 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3015 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3016 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3017
3018 return TRUE;
3019}
3020
3021/**
3022 * convert_state() - Converts the peer's view of the cluster state to our point of view
3023 * @ps: The state as seen by the peer.
3024 */
3025static union drbd_state convert_state(union drbd_state ps)
3026{
3027 union drbd_state ms;
3028
3029 static enum drbd_conns c_tab[] = {
3030 [C_CONNECTED] = C_CONNECTED,
3031
3032 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3033 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3034 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3035 [C_VERIFY_S] = C_VERIFY_T,
3036 [C_MASK] = C_MASK,
3037 };
3038
3039 ms.i = ps.i;
3040
3041 ms.conn = c_tab[ps.conn];
3042 ms.peer = ps.role;
3043 ms.role = ps.peer;
3044 ms.pdsk = ps.disk;
3045 ms.disk = ps.pdsk;
3046 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3047
3048 return ms;
3049}
3050
3051static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3052{
3053 struct p_req_state *p = (struct p_req_state *)h;
3054 union drbd_state mask, val;
3055 int rv;
3056
3057 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3058 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3059 return FALSE;
3060
3061 mask.i = be32_to_cpu(p->mask);
3062 val.i = be32_to_cpu(p->val);
3063
3064 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3065 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3066 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3067 return TRUE;
3068 }
3069
3070 mask = convert_state(mask);
3071 val = convert_state(val);
3072
3073 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3074
3075 drbd_send_sr_reply(mdev, rv);
3076 drbd_md_sync(mdev);
3077
3078 return TRUE;
3079}
3080
3081static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3082{
3083 struct p_state *p = (struct p_state *)h;
3084 enum drbd_conns nconn, oconn;
3085 union drbd_state ns, peer_state;
3086 enum drbd_disk_state real_peer_disk;
3087 int rv;
3088
3089 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3090 return FALSE;
3091
3092 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3093 return FALSE;
3094
3095 peer_state.i = be32_to_cpu(p->state);
3096
3097 real_peer_disk = peer_state.disk;
3098 if (peer_state.disk == D_NEGOTIATING) {
3099 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3100 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3101 }
3102
3103 spin_lock_irq(&mdev->req_lock);
3104 retry:
3105 oconn = nconn = mdev->state.conn;
3106 spin_unlock_irq(&mdev->req_lock);
3107
3108 if (nconn == C_WF_REPORT_PARAMS)
3109 nconn = C_CONNECTED;
3110
3111 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3112 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3113 int cr; /* consider resync */
3114
3115 /* if we established a new connection */
3116 cr = (oconn < C_CONNECTED);
3117 /* if we had an established connection
3118 * and one of the nodes newly attaches a disk */
3119 cr |= (oconn == C_CONNECTED &&
3120 (peer_state.disk == D_NEGOTIATING ||
3121 mdev->state.disk == D_NEGOTIATING));
3122 /* if we have both been inconsistent, and the peer has been
3123 * forced to be UpToDate with --overwrite-data */
3124 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3125 /* if we had been plain connected, and the admin requested to
3126 * start a sync by "invalidate" or "invalidate-remote" */
3127 cr |= (oconn == C_CONNECTED &&
3128 (peer_state.conn >= C_STARTING_SYNC_S &&
3129 peer_state.conn <= C_WF_BITMAP_T));
3130
3131 if (cr)
3132 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3133
3134 put_ldev(mdev);
3135 if (nconn == C_MASK) {
3136 if (mdev->state.disk == D_NEGOTIATING) {
3137 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3138 nconn = C_CONNECTED;
3139 } else if (peer_state.disk == D_NEGOTIATING) {
3140 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3141 peer_state.disk = D_DISKLESS;
3142 } else {
3143 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3144 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3145 return FALSE;
3146 }
3147 }
3148 }
3149
3150 spin_lock_irq(&mdev->req_lock);
3151 if (mdev->state.conn != oconn)
3152 goto retry;
3153 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3154 ns.i = mdev->state.i;
3155 ns.conn = nconn;
3156 ns.peer = peer_state.role;
3157 ns.pdsk = real_peer_disk;
3158 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3159 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3160 ns.disk = mdev->new_state_tmp.disk;
3161
3162 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3163 ns = mdev->state;
3164 spin_unlock_irq(&mdev->req_lock);
3165
3166 if (rv < SS_SUCCESS) {
3167 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3168 return FALSE;
3169 }
3170
3171 if (oconn > C_WF_REPORT_PARAMS) {
3172 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3173 peer_state.disk != D_NEGOTIATING ) {
3174 /* we want resync, peer has not yet decided to sync... */
3175 /* Nowadays only used when forcing a node into primary role and
3176 setting its disk to UpToDate with that */
3177 drbd_send_uuids(mdev);
3178 drbd_send_state(mdev);
3179 }
3180 }
3181
3182 mdev->net_conf->want_lose = 0;
3183
3184 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3185
3186 return TRUE;
3187}
3188
3189static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3190{
3191 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3192
3193 wait_event(mdev->misc_wait,
3194 mdev->state.conn == C_WF_SYNC_UUID ||
3195 mdev->state.conn < C_CONNECTED ||
3196 mdev->state.disk < D_NEGOTIATING);
3197
3198 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3199
3200 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3201 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3202 return FALSE;
3203
3204 /* Here the _drbd_uuid_ functions are right, current should
3205 _not_ be rotated into the history */
3206 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3207 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3208 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3209
3210 drbd_start_resync(mdev, C_SYNC_TARGET);
3211
3212 put_ldev(mdev);
3213 } else
3214 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3215
3216 return TRUE;
3217}
3218
3219enum receive_bitmap_ret { OK, DONE, FAILED };
3220
3221static enum receive_bitmap_ret
3222receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3223 unsigned long *buffer, struct bm_xfer_ctx *c)
3224{
3225 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3226 unsigned want = num_words * sizeof(long);
3227
3228 if (want != h->length) {
3229 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3230 return FAILED;
3231 }
3232 if (want == 0)
3233 return DONE;
3234 if (drbd_recv(mdev, buffer, want) != want)
3235 return FAILED;
3236
3237 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3238
3239 c->word_offset += num_words;
3240 c->bit_offset = c->word_offset * BITS_PER_LONG;
3241 if (c->bit_offset > c->bm_bits)
3242 c->bit_offset = c->bm_bits;
3243
3244 return OK;
3245}
3246
3247static enum receive_bitmap_ret
3248recv_bm_rle_bits(struct drbd_conf *mdev,
3249 struct p_compressed_bm *p,
3250 struct bm_xfer_ctx *c)
3251{
3252 struct bitstream bs;
3253 u64 look_ahead;
3254 u64 rl;
3255 u64 tmp;
3256 unsigned long s = c->bit_offset;
3257 unsigned long e;
3258 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3259 int toggle = DCBP_get_start(p);
3260 int have;
3261 int bits;
3262
3263 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3264
3265 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3266 if (bits < 0)
3267 return FAILED;
3268
3269 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3270 bits = vli_decode_bits(&rl, look_ahead);
3271 if (bits <= 0)
3272 return FAILED;
3273
3274 if (toggle) {
3275 e = s + rl -1;
3276 if (e >= c->bm_bits) {
3277 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3278 return FAILED;
3279 }
3280 _drbd_bm_set_bits(mdev, s, e);
3281 }
3282
3283 if (have < bits) {
3284 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3285 have, bits, look_ahead,
3286 (unsigned int)(bs.cur.b - p->code),
3287 (unsigned int)bs.buf_len);
3288 return FAILED;
3289 }
3290 look_ahead >>= bits;
3291 have -= bits;
3292
3293 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3294 if (bits < 0)
3295 return FAILED;
3296 look_ahead |= tmp << have;
3297 have += bits;
3298 }
3299
3300 c->bit_offset = s;
3301 bm_xfer_ctx_bit_to_word_offset(c);
3302
3303 return (s == c->bm_bits) ? DONE : OK;
3304}
3305
3306static enum receive_bitmap_ret
3307decode_bitmap_c(struct drbd_conf *mdev,
3308 struct p_compressed_bm *p,
3309 struct bm_xfer_ctx *c)
3310{
3311 if (DCBP_get_code(p) == RLE_VLI_Bits)
3312 return recv_bm_rle_bits(mdev, p, c);
3313
3314 /* other variants had been implemented for evaluation,
3315 * but have been dropped as this one turned out to be "best"
3316 * during all our tests. */
3317
3318 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3319 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3320 return FAILED;
3321}
3322
3323void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3324 const char *direction, struct bm_xfer_ctx *c)
3325{
3326 /* what would it take to transfer it "plaintext" */
3327 unsigned plain = sizeof(struct p_header) *
3328 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3329 + c->bm_words * sizeof(long);
3330 unsigned total = c->bytes[0] + c->bytes[1];
3331 unsigned r;
3332
3333 /* total can not be zero. but just in case: */
3334 if (total == 0)
3335 return;
3336
3337 /* don't report if not compressed */
3338 if (total >= plain)
3339 return;
3340
3341 /* total < plain. check for overflow, still */
3342 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3343 : (1000 * total / plain);
3344
3345 if (r > 1000)
3346 r = 1000;
3347
3348 r = 1000 - r;
3349 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3350 "total %u; compression: %u.%u%%\n",
3351 direction,
3352 c->bytes[1], c->packets[1],
3353 c->bytes[0], c->packets[0],
3354 total, r/10, r % 10);
3355}
3356
3357/* Since we are processing the bitfield from lower addresses to higher,
3358 it does not matter if the process it in 32 bit chunks or 64 bit
3359 chunks as long as it is little endian. (Understand it as byte stream,
3360 beginning with the lowest byte...) If we would use big endian
3361 we would need to process it from the highest address to the lowest,
3362 in order to be agnostic to the 32 vs 64 bits issue.
3363
3364 returns 0 on failure, 1 if we successfully received it. */
3365static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3366{
3367 struct bm_xfer_ctx c;
3368 void *buffer;
3369 enum receive_bitmap_ret ret;
3370 int ok = FALSE;
3371
3372 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3373
3374 drbd_bm_lock(mdev, "receive bitmap");
3375
3376 /* maybe we should use some per thread scratch page,
3377 * and allocate that during initial device creation? */
3378 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3379 if (!buffer) {
3380 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3381 goto out;
3382 }
3383
3384 c = (struct bm_xfer_ctx) {
3385 .bm_bits = drbd_bm_bits(mdev),
3386 .bm_words = drbd_bm_words(mdev),
3387 };
3388
3389 do {
3390 if (h->command == P_BITMAP) {
3391 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3392 } else if (h->command == P_COMPRESSED_BITMAP) {
3393 /* MAYBE: sanity check that we speak proto >= 90,
3394 * and the feature is enabled! */
3395 struct p_compressed_bm *p;
3396
3397 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3398 dev_err(DEV, "ReportCBitmap packet too large\n");
3399 goto out;
3400 }
3401 /* use the page buff */
3402 p = buffer;
3403 memcpy(p, h, sizeof(*h));
3404 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3405 goto out;
3406 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3407 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3408 return FAILED;
3409 }
3410 ret = decode_bitmap_c(mdev, p, &c);
3411 } else {
3412 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3413 goto out;
3414 }
3415
3416 c.packets[h->command == P_BITMAP]++;
3417 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3418
3419 if (ret != OK)
3420 break;
3421
3422 if (!drbd_recv_header(mdev, h))
3423 goto out;
3424 } while (ret == OK);
3425 if (ret == FAILED)
3426 goto out;
3427
3428 INFO_bm_xfer_stats(mdev, "receive", &c);
3429
3430 if (mdev->state.conn == C_WF_BITMAP_T) {
3431 ok = !drbd_send_bitmap(mdev);
3432 if (!ok)
3433 goto out;
3434 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3435 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3436 D_ASSERT(ok == SS_SUCCESS);
3437 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3438 /* admin may have requested C_DISCONNECTING,
3439 * other threads may have noticed network errors */
3440 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3441 drbd_conn_str(mdev->state.conn));
3442 }
3443
3444 ok = TRUE;
3445 out:
3446 drbd_bm_unlock(mdev);
3447 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3448 drbd_start_resync(mdev, C_SYNC_SOURCE);
3449 free_page((unsigned long) buffer);
3450 return ok;
3451}
3452
3453static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3454{
3455 /* TODO zero copy sink :) */
3456 static char sink[128];
3457 int size, want, r;
3458
3459 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3460 h->command, h->length);
3461
3462 size = h->length;
3463 while (size > 0) {
3464 want = min_t(int, size, sizeof(sink));
3465 r = drbd_recv(mdev, sink, want);
3466 ERR_IF(r <= 0) break;
3467 size -= r;
3468 }
3469 return size == 0;
3470}
3471
3472static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3473{
3474 if (mdev->state.disk >= D_INCONSISTENT)
3475 drbd_kick_lo(mdev);
3476
3477 /* Make sure we've acked all the TCP data associated
3478 * with the data requests being unplugged */
3479 drbd_tcp_quickack(mdev->data.socket);
3480
3481 return TRUE;
3482}
3483
3484typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3485
3486static drbd_cmd_handler_f drbd_default_handler[] = {
3487 [P_DATA] = receive_Data,
3488 [P_DATA_REPLY] = receive_DataReply,
3489 [P_RS_DATA_REPLY] = receive_RSDataReply,
3490 [P_BARRIER] = receive_Barrier,
3491 [P_BITMAP] = receive_bitmap,
3492 [P_COMPRESSED_BITMAP] = receive_bitmap,
3493 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3494 [P_DATA_REQUEST] = receive_DataRequest,
3495 [P_RS_DATA_REQUEST] = receive_DataRequest,
3496 [P_SYNC_PARAM] = receive_SyncParam,
3497 [P_SYNC_PARAM89] = receive_SyncParam,
3498 [P_PROTOCOL] = receive_protocol,
3499 [P_UUIDS] = receive_uuids,
3500 [P_SIZES] = receive_sizes,
3501 [P_STATE] = receive_state,
3502 [P_STATE_CHG_REQ] = receive_req_state,
3503 [P_SYNC_UUID] = receive_sync_uuid,
3504 [P_OV_REQUEST] = receive_DataRequest,
3505 [P_OV_REPLY] = receive_DataRequest,
3506 [P_CSUM_RS_REQUEST] = receive_DataRequest,
3507 /* anything missing from this table is in
3508 * the asender_tbl, see get_asender_cmd */
3509 [P_MAX_CMD] = NULL,
3510};
3511
3512static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3513static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3514
3515static void drbdd(struct drbd_conf *mdev)
3516{
3517 drbd_cmd_handler_f handler;
3518 struct p_header *header = &mdev->data.rbuf.header;
3519
3520 while (get_t_state(&mdev->receiver) == Running) {
3521 drbd_thread_current_set_cpu(mdev);
3522 if (!drbd_recv_header(mdev, header))
3523 break;
3524
3525 if (header->command < P_MAX_CMD)
3526 handler = drbd_cmd_handler[header->command];
3527 else if (P_MAY_IGNORE < header->command
3528 && header->command < P_MAX_OPT_CMD)
3529 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3530 else if (header->command > P_MAX_OPT_CMD)
3531 handler = receive_skip;
3532 else
3533 handler = NULL;
3534
3535 if (unlikely(!handler)) {
3536 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3537 header->command, header->length);
3538 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3539 break;
3540 }
3541 if (unlikely(!handler(mdev, header))) {
3542 dev_err(DEV, "error receiving %s, l: %d!\n",
3543 cmdname(header->command), header->length);
3544 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3545 break;
3546 }
3547
3548 trace_drbd_packet(mdev, mdev->data.socket, 2, &mdev->data.rbuf,
3549 __FILE__, __LINE__);
3550 }
3551}
3552
3553static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3554{
3555 struct hlist_head *slot;
3556 struct hlist_node *pos;
3557 struct hlist_node *tmp;
3558 struct drbd_request *req;
3559 int i;
3560
3561 /*
3562 * Application READ requests
3563 */
3564 spin_lock_irq(&mdev->req_lock);
3565 for (i = 0; i < APP_R_HSIZE; i++) {
3566 slot = mdev->app_reads_hash+i;
3567 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3568 /* it may (but should not any longer!)
3569 * be on the work queue; if that assert triggers,
3570 * we need to also grab the
3571 * spin_lock_irq(&mdev->data.work.q_lock);
3572 * and list_del_init here. */
3573 D_ASSERT(list_empty(&req->w.list));
3574 /* It would be nice to complete outside of spinlock.
3575 * But this is easier for now. */
3576 _req_mod(req, connection_lost_while_pending);
3577 }
3578 }
3579 for (i = 0; i < APP_R_HSIZE; i++)
3580 if (!hlist_empty(mdev->app_reads_hash+i))
3581 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3582 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3583
3584 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3585 spin_unlock_irq(&mdev->req_lock);
3586}
3587
3588void drbd_flush_workqueue(struct drbd_conf *mdev)
3589{
3590 struct drbd_wq_barrier barr;
3591
3592 barr.w.cb = w_prev_work_done;
3593 init_completion(&barr.done);
3594 drbd_queue_work(&mdev->data.work, &barr.w);
3595 wait_for_completion(&barr.done);
3596}
3597
3598static void drbd_disconnect(struct drbd_conf *mdev)
3599{
3600 enum drbd_fencing_p fp;
3601 union drbd_state os, ns;
3602 int rv = SS_UNKNOWN_ERROR;
3603 unsigned int i;
3604
3605 if (mdev->state.conn == C_STANDALONE)
3606 return;
3607 if (mdev->state.conn >= C_WF_CONNECTION)
3608 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3609 drbd_conn_str(mdev->state.conn));
3610
3611 /* asender does not clean up anything. it must not interfere, either */
3612 drbd_thread_stop(&mdev->asender);
3613
3614 mutex_lock(&mdev->data.mutex);
3615 drbd_free_sock(mdev);
3616 mutex_unlock(&mdev->data.mutex);
3617
3618 spin_lock_irq(&mdev->req_lock);
3619 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3620 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3621 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3622 spin_unlock_irq(&mdev->req_lock);
3623
3624 /* We do not have data structures that would allow us to
3625 * get the rs_pending_cnt down to 0 again.
3626 * * On C_SYNC_TARGET we do not have any data structures describing
3627 * the pending RSDataRequest's we have sent.
3628 * * On C_SYNC_SOURCE there is no data structure that tracks
3629 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3630 * And no, it is not the sum of the reference counts in the
3631 * resync_LRU. The resync_LRU tracks the whole operation including
3632 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3633 * on the fly. */
3634 drbd_rs_cancel_all(mdev);
3635 mdev->rs_total = 0;
3636 mdev->rs_failed = 0;
3637 atomic_set(&mdev->rs_pending_cnt, 0);
3638 wake_up(&mdev->misc_wait);
3639
3640 /* make sure syncer is stopped and w_resume_next_sg queued */
3641 del_timer_sync(&mdev->resync_timer);
3642 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3643 resync_timer_fn((unsigned long)mdev);
3644
3645 /* so we can be sure that all remote or resync reads
3646 * made it at least to net_ee */
3647 wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
3648
3649 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3650 * w_make_resync_request etc. which may still be on the worker queue
3651 * to be "canceled" */
3652 drbd_flush_workqueue(mdev);
3653
3654 /* This also does reclaim_net_ee(). If we do this too early, we might
3655 * miss some resync ee and pages.*/
3656 drbd_process_done_ee(mdev);
3657
3658 kfree(mdev->p_uuid);
3659 mdev->p_uuid = NULL;
3660
3661 if (!mdev->state.susp)
3662 tl_clear(mdev);
3663
3664 drbd_fail_pending_reads(mdev);
3665
3666 dev_info(DEV, "Connection closed\n");
3667
3668 drbd_md_sync(mdev);
3669
3670 fp = FP_DONT_CARE;
3671 if (get_ldev(mdev)) {
3672 fp = mdev->ldev->dc.fencing;
3673 put_ldev(mdev);
3674 }
3675
3676 if (mdev->state.role == R_PRIMARY) {
3677 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3678 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3679 drbd_request_state(mdev, NS(pdsk, nps));
3680 }
3681 }
3682
3683 spin_lock_irq(&mdev->req_lock);
3684 os = mdev->state;
3685 if (os.conn >= C_UNCONNECTED) {
3686 /* Do not restart in case we are C_DISCONNECTING */
3687 ns = os;
3688 ns.conn = C_UNCONNECTED;
3689 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3690 }
3691 spin_unlock_irq(&mdev->req_lock);
3692
3693 if (os.conn == C_DISCONNECTING) {
3694 struct hlist_head *h;
3695 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3696
3697 /* we must not free the tl_hash
3698 * while application io is still on the fly */
3699 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3700
3701 spin_lock_irq(&mdev->req_lock);
3702 /* paranoia code */
3703 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3704 if (h->first)
3705 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3706 (int)(h - mdev->ee_hash), h->first);
3707 kfree(mdev->ee_hash);
3708 mdev->ee_hash = NULL;
3709 mdev->ee_hash_s = 0;
3710
3711 /* paranoia code */
3712 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3713 if (h->first)
3714 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3715 (int)(h - mdev->tl_hash), h->first);
3716 kfree(mdev->tl_hash);
3717 mdev->tl_hash = NULL;
3718 mdev->tl_hash_s = 0;
3719 spin_unlock_irq(&mdev->req_lock);
3720
3721 crypto_free_hash(mdev->cram_hmac_tfm);
3722 mdev->cram_hmac_tfm = NULL;
3723
3724 kfree(mdev->net_conf);
3725 mdev->net_conf = NULL;
3726 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3727 }
3728
3729 /* tcp_close and release of sendpage pages can be deferred. I don't
3730 * want to use SO_LINGER, because apparently it can be deferred for
3731 * more than 20 seconds (longest time I checked).
3732 *
3733 * Actually we don't care for exactly when the network stack does its
3734 * put_page(), but release our reference on these pages right here.
3735 */
3736 i = drbd_release_ee(mdev, &mdev->net_ee);
3737 if (i)
3738 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3739 i = atomic_read(&mdev->pp_in_use);
3740 if (i)
3741 dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3742
3743 D_ASSERT(list_empty(&mdev->read_ee));
3744 D_ASSERT(list_empty(&mdev->active_ee));
3745 D_ASSERT(list_empty(&mdev->sync_ee));
3746 D_ASSERT(list_empty(&mdev->done_ee));
3747
3748 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3749 atomic_set(&mdev->current_epoch->epoch_size, 0);
3750 D_ASSERT(list_empty(&mdev->current_epoch->list));
3751}
3752
3753/*
3754 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3755 * we can agree on is stored in agreed_pro_version.
3756 *
3757 * feature flags and the reserved array should be enough room for future
3758 * enhancements of the handshake protocol, and possible plugins...
3759 *
3760 * for now, they are expected to be zero, but ignored.
3761 */
3762static int drbd_send_handshake(struct drbd_conf *mdev)
3763{
3764 /* ASSERT current == mdev->receiver ... */
3765 struct p_handshake *p = &mdev->data.sbuf.handshake;
3766 int ok;
3767
3768 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3769 dev_err(DEV, "interrupted during initial handshake\n");
3770 return 0; /* interrupted. not ok. */
3771 }
3772
3773 if (mdev->data.socket == NULL) {
3774 mutex_unlock(&mdev->data.mutex);
3775 return 0;
3776 }
3777
3778 memset(p, 0, sizeof(*p));
3779 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3780 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3781 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3782 (struct p_header *)p, sizeof(*p), 0 );
3783 mutex_unlock(&mdev->data.mutex);
3784 return ok;
3785}
3786
3787/*
3788 * return values:
3789 * 1 yes, we have a valid connection
3790 * 0 oops, did not work out, please try again
3791 * -1 peer talks different language,
3792 * no point in trying again, please go standalone.
3793 */
3794static int drbd_do_handshake(struct drbd_conf *mdev)
3795{
3796 /* ASSERT current == mdev->receiver ... */
3797 struct p_handshake *p = &mdev->data.rbuf.handshake;
3798 const int expect = sizeof(struct p_handshake)
3799 -sizeof(struct p_header);
3800 int rv;
3801
3802 rv = drbd_send_handshake(mdev);
3803 if (!rv)
3804 return 0;
3805
3806 rv = drbd_recv_header(mdev, &p->head);
3807 if (!rv)
3808 return 0;
3809
3810 if (p->head.command != P_HAND_SHAKE) {
3811 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3812 cmdname(p->head.command), p->head.command);
3813 return -1;
3814 }
3815
3816 if (p->head.length != expect) {
3817 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3818 expect, p->head.length);
3819 return -1;
3820 }
3821
3822 rv = drbd_recv(mdev, &p->head.payload, expect);
3823
3824 if (rv != expect) {
3825 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3826 return 0;
3827 }
3828
3829 trace_drbd_packet(mdev, mdev->data.socket, 2, &mdev->data.rbuf,
3830 __FILE__, __LINE__);
3831
3832 p->protocol_min = be32_to_cpu(p->protocol_min);
3833 p->protocol_max = be32_to_cpu(p->protocol_max);
3834 if (p->protocol_max == 0)
3835 p->protocol_max = p->protocol_min;
3836
3837 if (PRO_VERSION_MAX < p->protocol_min ||
3838 PRO_VERSION_MIN > p->protocol_max)
3839 goto incompat;
3840
3841 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3842
3843 dev_info(DEV, "Handshake successful: "
3844 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3845
3846 return 1;
3847
3848 incompat:
3849 dev_err(DEV, "incompatible DRBD dialects: "
3850 "I support %d-%d, peer supports %d-%d\n",
3851 PRO_VERSION_MIN, PRO_VERSION_MAX,
3852 p->protocol_min, p->protocol_max);
3853 return -1;
3854}
3855
3856#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3857static int drbd_do_auth(struct drbd_conf *mdev)
3858{
3859 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3860 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3861 return 0;
3862}
3863#else
3864#define CHALLENGE_LEN 64
3865static int drbd_do_auth(struct drbd_conf *mdev)
3866{
3867 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
3868 struct scatterlist sg;
3869 char *response = NULL;
3870 char *right_response = NULL;
3871 char *peers_ch = NULL;
3872 struct p_header p;
3873 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3874 unsigned int resp_size;
3875 struct hash_desc desc;
3876 int rv;
3877
3878 desc.tfm = mdev->cram_hmac_tfm;
3879 desc.flags = 0;
3880
3881 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3882 (u8 *)mdev->net_conf->shared_secret, key_len);
3883 if (rv) {
3884 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3885 rv = 0;
3886 goto fail;
3887 }
3888
3889 get_random_bytes(my_challenge, CHALLENGE_LEN);
3890
3891 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3892 if (!rv)
3893 goto fail;
3894
3895 rv = drbd_recv_header(mdev, &p);
3896 if (!rv)
3897 goto fail;
3898
3899 if (p.command != P_AUTH_CHALLENGE) {
3900 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3901 cmdname(p.command), p.command);
3902 rv = 0;
3903 goto fail;
3904 }
3905
3906 if (p.length > CHALLENGE_LEN*2) {
3907 dev_err(DEV, "expected AuthChallenge payload too big.\n");
3908 rv = 0;
3909 goto fail;
3910 }
3911
3912 peers_ch = kmalloc(p.length, GFP_NOIO);
3913 if (peers_ch == NULL) {
3914 dev_err(DEV, "kmalloc of peers_ch failed\n");
3915 rv = 0;
3916 goto fail;
3917 }
3918
3919 rv = drbd_recv(mdev, peers_ch, p.length);
3920
3921 if (rv != p.length) {
3922 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3923 rv = 0;
3924 goto fail;
3925 }
3926
3927 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3928 response = kmalloc(resp_size, GFP_NOIO);
3929 if (response == NULL) {
3930 dev_err(DEV, "kmalloc of response failed\n");
3931 rv = 0;
3932 goto fail;
3933 }
3934
3935 sg_init_table(&sg, 1);
3936 sg_set_buf(&sg, peers_ch, p.length);
3937
3938 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3939 if (rv) {
3940 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3941 rv = 0;
3942 goto fail;
3943 }
3944
3945 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3946 if (!rv)
3947 goto fail;
3948
3949 rv = drbd_recv_header(mdev, &p);
3950 if (!rv)
3951 goto fail;
3952
3953 if (p.command != P_AUTH_RESPONSE) {
3954 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3955 cmdname(p.command), p.command);
3956 rv = 0;
3957 goto fail;
3958 }
3959
3960 if (p.length != resp_size) {
3961 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3962 rv = 0;
3963 goto fail;
3964 }
3965
3966 rv = drbd_recv(mdev, response , resp_size);
3967
3968 if (rv != resp_size) {
3969 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3970 rv = 0;
3971 goto fail;
3972 }
3973
3974 right_response = kmalloc(resp_size, GFP_NOIO);
3975 if (response == NULL) {
3976 dev_err(DEV, "kmalloc of right_response failed\n");
3977 rv = 0;
3978 goto fail;
3979 }
3980
3981 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3982
3983 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3984 if (rv) {
3985 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3986 rv = 0;
3987 goto fail;
3988 }
3989
3990 rv = !memcmp(response, right_response, resp_size);
3991
3992 if (rv)
3993 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
3994 resp_size, mdev->net_conf->cram_hmac_alg);
3995
3996 fail:
3997 kfree(peers_ch);
3998 kfree(response);
3999 kfree(right_response);
4000
4001 return rv;
4002}
4003#endif
4004
4005int drbdd_init(struct drbd_thread *thi)
4006{
4007 struct drbd_conf *mdev = thi->mdev;
4008 unsigned int minor = mdev_to_minor(mdev);
4009 int h;
4010
4011 sprintf(current->comm, "drbd%d_receiver", minor);
4012
4013 dev_info(DEV, "receiver (re)started\n");
4014
4015 do {
4016 h = drbd_connect(mdev);
4017 if (h == 0) {
4018 drbd_disconnect(mdev);
4019 __set_current_state(TASK_INTERRUPTIBLE);
4020 schedule_timeout(HZ);
4021 }
4022 if (h == -1) {
4023 dev_warn(DEV, "Discarding network configuration.\n");
4024 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4025 }
4026 } while (h == 0);
4027
4028 if (h > 0) {
4029 if (get_net_conf(mdev)) {
4030 drbdd(mdev);
4031 put_net_conf(mdev);
4032 }
4033 }
4034
4035 drbd_disconnect(mdev);
4036
4037 dev_info(DEV, "receiver terminated\n");
4038 return 0;
4039}
4040
4041/* ********* acknowledge sender ******** */
4042
4043static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4044{
4045 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4046
4047 int retcode = be32_to_cpu(p->retcode);
4048
4049 if (retcode >= SS_SUCCESS) {
4050 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4051 } else {
4052 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4053 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4054 drbd_set_st_err_str(retcode), retcode);
4055 }
4056 wake_up(&mdev->state_wait);
4057
4058 return TRUE;
4059}
4060
4061static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4062{
4063 return drbd_send_ping_ack(mdev);
4064
4065}
4066
4067static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4068{
4069 /* restore idle timeout */
4070 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4071
4072 return TRUE;
4073}
4074
4075static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4076{
4077 struct p_block_ack *p = (struct p_block_ack *)h;
4078 sector_t sector = be64_to_cpu(p->sector);
4079 int blksize = be32_to_cpu(p->blksize);
4080
4081 D_ASSERT(mdev->agreed_pro_version >= 89);
4082
4083 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4084
4085 drbd_rs_complete_io(mdev, sector);
4086 drbd_set_in_sync(mdev, sector, blksize);
4087 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4088 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4089 dec_rs_pending(mdev);
4090
4091 return TRUE;
4092}
4093
4094/* when we receive the ACK for a write request,
4095 * verify that we actually know about it */
4096static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4097 u64 id, sector_t sector)
4098{
4099 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4100 struct hlist_node *n;
4101 struct drbd_request *req;
4102
4103 hlist_for_each_entry(req, n, slot, colision) {
4104 if ((unsigned long)req == (unsigned long)id) {
4105 if (req->sector != sector) {
4106 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4107 "wrong sector (%llus versus %llus)\n", req,
4108 (unsigned long long)req->sector,
4109 (unsigned long long)sector);
4110 break;
4111 }
4112 return req;
4113 }
4114 }
4115 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4116 (void *)(unsigned long)id, (unsigned long long)sector);
4117 return NULL;
4118}
4119
4120typedef struct drbd_request *(req_validator_fn)
4121 (struct drbd_conf *mdev, u64 id, sector_t sector);
4122
4123static int validate_req_change_req_state(struct drbd_conf *mdev,
4124 u64 id, sector_t sector, req_validator_fn validator,
4125 const char *func, enum drbd_req_event what)
4126{
4127 struct drbd_request *req;
4128 struct bio_and_error m;
4129
4130 spin_lock_irq(&mdev->req_lock);
4131 req = validator(mdev, id, sector);
4132 if (unlikely(!req)) {
4133 spin_unlock_irq(&mdev->req_lock);
4134 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4135 return FALSE;
4136 }
4137 __req_mod(req, what, &m);
4138 spin_unlock_irq(&mdev->req_lock);
4139
4140 if (m.bio)
4141 complete_master_bio(mdev, &m);
4142 return TRUE;
4143}
4144
4145static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4146{
4147 struct p_block_ack *p = (struct p_block_ack *)h;
4148 sector_t sector = be64_to_cpu(p->sector);
4149 int blksize = be32_to_cpu(p->blksize);
4150 enum drbd_req_event what;
4151
4152 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4153
4154 if (is_syncer_block_id(p->block_id)) {
4155 drbd_set_in_sync(mdev, sector, blksize);
4156 dec_rs_pending(mdev);
4157 return TRUE;
4158 }
4159 switch (be16_to_cpu(h->command)) {
4160 case P_RS_WRITE_ACK:
4161 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4162 what = write_acked_by_peer_and_sis;
4163 break;
4164 case P_WRITE_ACK:
4165 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4166 what = write_acked_by_peer;
4167 break;
4168 case P_RECV_ACK:
4169 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4170 what = recv_acked_by_peer;
4171 break;
4172 case P_DISCARD_ACK:
4173 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4174 what = conflict_discarded_by_peer;
4175 break;
4176 default:
4177 D_ASSERT(0);
4178 return FALSE;
4179 }
4180
4181 return validate_req_change_req_state(mdev, p->block_id, sector,
4182 _ack_id_to_req, __func__ , what);
4183}
4184
4185static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4186{
4187 struct p_block_ack *p = (struct p_block_ack *)h;
4188 sector_t sector = be64_to_cpu(p->sector);
4189
4190 if (__ratelimit(&drbd_ratelimit_state))
4191 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4192
4193 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4194
4195 if (is_syncer_block_id(p->block_id)) {
4196 int size = be32_to_cpu(p->blksize);
4197 dec_rs_pending(mdev);
4198 drbd_rs_failed_io(mdev, sector, size);
4199 return TRUE;
4200 }
4201 return validate_req_change_req_state(mdev, p->block_id, sector,
4202 _ack_id_to_req, __func__ , neg_acked);
4203}
4204
4205static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4206{
4207 struct p_block_ack *p = (struct p_block_ack *)h;
4208 sector_t sector = be64_to_cpu(p->sector);
4209
4210 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4211 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4212 (unsigned long long)sector, be32_to_cpu(p->blksize));
4213
4214 return validate_req_change_req_state(mdev, p->block_id, sector,
4215 _ar_id_to_req, __func__ , neg_acked);
4216}
4217
4218static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4219{
4220 sector_t sector;
4221 int size;
4222 struct p_block_ack *p = (struct p_block_ack *)h;
4223
4224 sector = be64_to_cpu(p->sector);
4225 size = be32_to_cpu(p->blksize);
4226 D_ASSERT(p->block_id == ID_SYNCER);
4227
4228 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4229
4230 dec_rs_pending(mdev);
4231
4232 if (get_ldev_if_state(mdev, D_FAILED)) {
4233 drbd_rs_complete_io(mdev, sector);
4234 drbd_rs_failed_io(mdev, sector, size);
4235 put_ldev(mdev);
4236 }
4237
4238 return TRUE;
4239}
4240
4241static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4242{
4243 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4244
4245 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4246
4247 return TRUE;
4248}
4249
4250static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4251{
4252 struct p_block_ack *p = (struct p_block_ack *)h;
4253 struct drbd_work *w;
4254 sector_t sector;
4255 int size;
4256
4257 sector = be64_to_cpu(p->sector);
4258 size = be32_to_cpu(p->blksize);
4259
4260 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4261
4262 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4263 drbd_ov_oos_found(mdev, sector, size);
4264 else
4265 ov_oos_print(mdev);
4266
4267 drbd_rs_complete_io(mdev, sector);
4268 dec_rs_pending(mdev);
4269
4270 if (--mdev->ov_left == 0) {
4271 w = kmalloc(sizeof(*w), GFP_NOIO);
4272 if (w) {
4273 w->cb = w_ov_finished;
4274 drbd_queue_work_front(&mdev->data.work, w);
4275 } else {
4276 dev_err(DEV, "kmalloc(w) failed.");
4277 ov_oos_print(mdev);
4278 drbd_resync_finished(mdev);
4279 }
4280 }
4281 return TRUE;
4282}
4283
4284struct asender_cmd {
4285 size_t pkt_size;
4286 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4287};
4288
4289static struct asender_cmd *get_asender_cmd(int cmd)
4290{
4291 static struct asender_cmd asender_tbl[] = {
4292 /* anything missing from this table is in
4293 * the drbd_cmd_handler (drbd_default_handler) table,
4294 * see the beginning of drbdd() */
4295 [P_PING] = { sizeof(struct p_header), got_Ping },
4296 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4297 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4298 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4299 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4300 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4301 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4302 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4303 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4304 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4305 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4306 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4307 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
4308 [P_MAX_CMD] = { 0, NULL },
4309 };
4310 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4311 return NULL;
4312 return &asender_tbl[cmd];
4313}
4314
4315int drbd_asender(struct drbd_thread *thi)
4316{
4317 struct drbd_conf *mdev = thi->mdev;
4318 struct p_header *h = &mdev->meta.rbuf.header;
4319 struct asender_cmd *cmd = NULL;
4320
4321 int rv, len;
4322 void *buf = h;
4323 int received = 0;
4324 int expect = sizeof(struct p_header);
4325 int empty;
4326
4327 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4328
4329 current->policy = SCHED_RR; /* Make this a realtime task! */
4330 current->rt_priority = 2; /* more important than all other tasks */
4331
4332 while (get_t_state(thi) == Running) {
4333 drbd_thread_current_set_cpu(mdev);
4334 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4335 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4336 mdev->meta.socket->sk->sk_rcvtimeo =
4337 mdev->net_conf->ping_timeo*HZ/10;
4338 }
4339
4340 /* conditionally cork;
4341 * it may hurt latency if we cork without much to send */
4342 if (!mdev->net_conf->no_cork &&
4343 3 < atomic_read(&mdev->unacked_cnt))
4344 drbd_tcp_cork(mdev->meta.socket);
4345 while (1) {
4346 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4347 flush_signals(current);
4348 if (!drbd_process_done_ee(mdev)) {
4349 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4350 goto reconnect;
4351 }
4352 /* to avoid race with newly queued ACKs */
4353 set_bit(SIGNAL_ASENDER, &mdev->flags);
4354 spin_lock_irq(&mdev->req_lock);
4355 empty = list_empty(&mdev->done_ee);
4356 spin_unlock_irq(&mdev->req_lock);
4357 /* new ack may have been queued right here,
4358 * but then there is also a signal pending,
4359 * and we start over... */
4360 if (empty)
4361 break;
4362 }
4363 /* but unconditionally uncork unless disabled */
4364 if (!mdev->net_conf->no_cork)
4365 drbd_tcp_uncork(mdev->meta.socket);
4366
4367 /* short circuit, recv_msg would return EINTR anyways. */
4368 if (signal_pending(current))
4369 continue;
4370
4371 rv = drbd_recv_short(mdev, mdev->meta.socket,
4372 buf, expect-received, 0);
4373 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4374
4375 flush_signals(current);
4376
4377 /* Note:
4378 * -EINTR (on meta) we got a signal
4379 * -EAGAIN (on meta) rcvtimeo expired
4380 * -ECONNRESET other side closed the connection
4381 * -ERESTARTSYS (on data) we got a signal
4382 * rv < 0 other than above: unexpected error!
4383 * rv == expected: full header or command
4384 * rv < expected: "woken" by signal during receive
4385 * rv == 0 : "connection shut down by peer"
4386 */
4387 if (likely(rv > 0)) {
4388 received += rv;
4389 buf += rv;
4390 } else if (rv == 0) {
4391 dev_err(DEV, "meta connection shut down by peer.\n");
4392 goto reconnect;
4393 } else if (rv == -EAGAIN) {
4394 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4395 mdev->net_conf->ping_timeo*HZ/10) {
4396 dev_err(DEV, "PingAck did not arrive in time.\n");
4397 goto reconnect;
4398 }
4399 set_bit(SEND_PING, &mdev->flags);
4400 continue;
4401 } else if (rv == -EINTR) {
4402 continue;
4403 } else {
4404 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4405 goto reconnect;
4406 }
4407
4408 if (received == expect && cmd == NULL) {
4409 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4410 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4411 (long)be32_to_cpu(h->magic),
4412 h->command, h->length);
4413 goto reconnect;
4414 }
4415 cmd = get_asender_cmd(be16_to_cpu(h->command));
4416 len = be16_to_cpu(h->length);
4417 if (unlikely(cmd == NULL)) {
4418 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4419 (long)be32_to_cpu(h->magic),
4420 h->command, h->length);
4421 goto disconnect;
4422 }
4423 expect = cmd->pkt_size;
4424 ERR_IF(len != expect-sizeof(struct p_header)) {
4425 trace_drbd_packet(mdev, mdev->meta.socket, 1, (void *)h, __FILE__, __LINE__);
4426 goto reconnect;
4427 }
4428 }
4429 if (received == expect) {
4430 D_ASSERT(cmd != NULL);
4431 trace_drbd_packet(mdev, mdev->meta.socket, 1, (void *)h, __FILE__, __LINE__);
4432 if (!cmd->process(mdev, h))
4433 goto reconnect;
4434
4435 buf = h;
4436 received = 0;
4437 expect = sizeof(struct p_header);
4438 cmd = NULL;
4439 }
4440 }
4441
4442 if (0) {
4443reconnect:
4444 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4445 }
4446 if (0) {
4447disconnect:
4448 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4449 }
4450 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4451
4452 D_ASSERT(mdev->state.conn < C_CONNECTED);
4453 dev_info(DEV, "asender terminated\n");
4454
4455 return 0;
4456}