blob: 388a3e8bb0d0f9baa1ea7240cb0e08b33f5de8ee [file] [log] [blame]
Philipp Reisnerb411b362009-09-25 16:07:19 -07001/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
Philipp Reisnerb411b362009-09-25 16:07:19 -070026#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
Philipp Reisnerb411b362009-09-25 16:07:19 -070031#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
45#include <linux/mm.h>
46#include <linux/string.h>
47#include <linux/scatterlist.h>
48#include "drbd_int.h"
Philipp Reisnerb411b362009-09-25 16:07:19 -070049#include "drbd_req.h"
50
51#include "drbd_vli.h"
52
53struct flush_work {
54 struct drbd_work w;
55 struct drbd_epoch *epoch;
56};
57
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
64static int drbd_do_handshake(struct drbd_conf *mdev);
65static int drbd_do_auth(struct drbd_conf *mdev);
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71{
72 struct drbd_epoch *prev;
73 spin_lock(&mdev->epoch_lock);
74 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 if (prev == epoch || prev == mdev->current_epoch)
76 prev = NULL;
77 spin_unlock(&mdev->epoch_lock);
78 return prev;
79}
80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
Lars Ellenberg45bb9122010-05-14 17:10:48 +020083/*
84 * some helper functions to deal with single linked page lists,
85 * page->private being our "next" pointer.
86 */
87
88/* If at least n pages are linked at head, get n pages off.
89 * Otherwise, don't modify head, and return NULL.
90 * Locking is the responsibility of the caller.
91 */
92static struct page *page_chain_del(struct page **head, int n)
93{
94 struct page *page;
95 struct page *tmp;
96
97 BUG_ON(!n);
98 BUG_ON(!head);
99
100 page = *head;
101 while (page) {
102 tmp = page_chain_next(page);
103 if (--n == 0)
104 break; /* found sufficient pages */
105 if (tmp == NULL)
106 /* insufficient pages, don't use any of them. */
107 return NULL;
108 page = tmp;
109 }
110
111 /* add end of list marker for the returned list */
112 set_page_private(page, 0);
113 /* actual return value, and adjustment of head */
114 page = *head;
115 *head = tmp;
116 return page;
117}
118
119/* may be used outside of locks to find the tail of a (usually short)
120 * "private" page chain, before adding it back to a global chain head
121 * with page_chain_add() under a spinlock. */
122static struct page *page_chain_tail(struct page *page, int *len)
123{
124 struct page *tmp;
125 int i = 1;
126 while ((tmp = page_chain_next(page)))
127 ++i, page = tmp;
128 if (len)
129 *len = i;
130 return page;
131}
132
133static int page_chain_free(struct page *page)
134{
135 struct page *tmp;
136 int i = 0;
137 page_chain_for_each_safe(page, tmp) {
138 put_page(page);
139 ++i;
140 }
141 return i;
142}
143
144static void page_chain_add(struct page **head,
145 struct page *chain_first, struct page *chain_last)
146{
147#if 1
148 struct page *tmp;
149 tmp = page_chain_tail(chain_first, NULL);
150 BUG_ON(tmp != chain_last);
151#endif
152
153 /* add chain to head */
154 set_page_private(chain_last, (unsigned long)*head);
155 *head = chain_first;
156}
157
158static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700159{
160 struct page *page = NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200161 struct page *tmp = NULL;
162 int i = 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700163
164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
165 * So what. It saves a spin_lock. */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200166 if (drbd_pp_vacant >= number) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700167 spin_lock(&drbd_pp_lock);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200168 page = page_chain_del(&drbd_pp_pool, number);
169 if (page)
170 drbd_pp_vacant -= number;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700171 spin_unlock(&drbd_pp_lock);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200172 if (page)
173 return page;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700174 }
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200175
Philipp Reisnerb411b362009-09-25 16:07:19 -0700176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 * which in turn might block on the other node at this very place. */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200179 for (i = 0; i < number; i++) {
180 tmp = alloc_page(GFP_TRY);
181 if (!tmp)
182 break;
183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
185 }
186
187 if (i == number)
188 return page;
189
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_pp_alloc will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
199 }
200 return NULL;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700201}
202
203/* kick lower level device, if we have more than (arbitrary number)
204 * reference counts on it, which typically are locally submitted io
205 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
206static void maybe_kick_lo(struct drbd_conf *mdev)
207{
208 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
209 drbd_kick_lo(mdev);
210}
211
212static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
213{
214 struct drbd_epoch_entry *e;
215 struct list_head *le, *tle;
216
217 /* The EEs are always appended to the end of the list. Since
218 they are sent in order over the wire, they have to finish
219 in order. As soon as we see the first not finished we can
220 stop to examine the list... */
221
222 list_for_each_safe(le, tle, &mdev->net_ee) {
223 e = list_entry(le, struct drbd_epoch_entry, w.list);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200224 if (drbd_ee_has_active_page(e))
Philipp Reisnerb411b362009-09-25 16:07:19 -0700225 break;
226 list_move(le, to_be_freed);
227 }
228}
229
230static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
231{
232 LIST_HEAD(reclaimed);
233 struct drbd_epoch_entry *e, *t;
234
235 maybe_kick_lo(mdev);
236 spin_lock_irq(&mdev->req_lock);
237 reclaim_net_ee(mdev, &reclaimed);
238 spin_unlock_irq(&mdev->req_lock);
239
240 list_for_each_entry_safe(e, t, &reclaimed, w.list)
241 drbd_free_ee(mdev, e);
242}
243
244/**
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200245 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700246 * @mdev: DRBD device.
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200247 * @number: number of pages requested
248 * @retry: whether to retry, if not enough pages are available right now
Philipp Reisnerb411b362009-09-25 16:07:19 -0700249 *
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200250 * Tries to allocate number pages, first from our own page pool, then from
251 * the kernel, unless this allocation would exceed the max_buffers setting.
252 * Possibly retry until DRBD frees sufficient pages somewhere else.
253 *
254 * Returns a page chain linked via page->private.
Philipp Reisnerb411b362009-09-25 16:07:19 -0700255 */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200256static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700257{
258 struct page *page = NULL;
259 DEFINE_WAIT(wait);
260
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200261 /* Yes, we may run up to @number over max_buffers. If we
262 * follow it strictly, the admin will get it wrong anyways. */
263 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
264 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700265
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200266 while (page == NULL) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700267 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
268
269 drbd_kick_lo_and_reclaim_net(mdev);
270
271 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200272 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700273 if (page)
274 break;
275 }
276
277 if (!retry)
278 break;
279
280 if (signal_pending(current)) {
281 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
282 break;
283 }
284
285 schedule();
286 }
287 finish_wait(&drbd_pp_wait, &wait);
288
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200289 if (page)
290 atomic_add(number, &mdev->pp_in_use);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700291 return page;
292}
293
294/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200295 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
296 * Either links the page chain back to the global pool,
297 * or returns all pages to the system. */
Philipp Reisnerb411b362009-09-25 16:07:19 -0700298static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
299{
Philipp Reisnerb411b362009-09-25 16:07:19 -0700300 int i;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200301 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
302 i = page_chain_free(page);
303 else {
304 struct page *tmp;
305 tmp = page_chain_tail(page, &i);
306 spin_lock(&drbd_pp_lock);
307 page_chain_add(&drbd_pp_pool, page, tmp);
308 drbd_pp_vacant += i;
309 spin_unlock(&drbd_pp_lock);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700310 }
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200311 atomic_sub(i, &mdev->pp_in_use);
312 i = atomic_read(&mdev->pp_in_use);
313 if (i < 0)
314 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700315 wake_up(&drbd_pp_wait);
316}
317
318/*
319You need to hold the req_lock:
320 _drbd_wait_ee_list_empty()
321
322You must not have the req_lock:
323 drbd_free_ee()
324 drbd_alloc_ee()
325 drbd_init_ee()
326 drbd_release_ee()
327 drbd_ee_fix_bhs()
328 drbd_process_done_ee()
329 drbd_clear_done_ee()
330 drbd_wait_ee_list_empty()
331*/
332
333struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
334 u64 id,
335 sector_t sector,
336 unsigned int data_size,
337 gfp_t gfp_mask) __must_hold(local)
338{
Philipp Reisnerb411b362009-09-25 16:07:19 -0700339 struct drbd_epoch_entry *e;
340 struct page *page;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700342
343 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
344 return NULL;
345
346 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
347 if (!e) {
348 if (!(gfp_mask & __GFP_NOWARN))
349 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
350 return NULL;
351 }
352
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200353 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
354 if (!page)
355 goto fail;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700356
Philipp Reisnerb411b362009-09-25 16:07:19 -0700357 INIT_HLIST_NODE(&e->colision);
358 e->epoch = NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200359 e->mdev = mdev;
360 e->pages = page;
361 atomic_set(&e->pending_bios, 0);
362 e->size = data_size;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700363 e->flags = 0;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200364 e->sector = sector;
365 e->sector = sector;
366 e->block_id = id;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700367
Philipp Reisnerb411b362009-09-25 16:07:19 -0700368 return e;
369
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200370 fail:
Philipp Reisnerb411b362009-09-25 16:07:19 -0700371 mempool_free(e, drbd_ee_mempool);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700372 return NULL;
373}
374
375void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
376{
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200377 drbd_pp_free(mdev, e->pages);
378 D_ASSERT(atomic_read(&e->pending_bios) == 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700379 D_ASSERT(hlist_unhashed(&e->colision));
380 mempool_free(e, drbd_ee_mempool);
381}
382
383int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
384{
385 LIST_HEAD(work_list);
386 struct drbd_epoch_entry *e, *t;
387 int count = 0;
388
389 spin_lock_irq(&mdev->req_lock);
390 list_splice_init(list, &work_list);
391 spin_unlock_irq(&mdev->req_lock);
392
393 list_for_each_entry_safe(e, t, &work_list, w.list) {
394 drbd_free_ee(mdev, e);
395 count++;
396 }
397 return count;
398}
399
400
401/*
402 * This function is called from _asender only_
403 * but see also comments in _req_mod(,barrier_acked)
404 * and receive_Barrier.
405 *
406 * Move entries from net_ee to done_ee, if ready.
407 * Grab done_ee, call all callbacks, free the entries.
408 * The callbacks typically send out ACKs.
409 */
410static int drbd_process_done_ee(struct drbd_conf *mdev)
411{
412 LIST_HEAD(work_list);
413 LIST_HEAD(reclaimed);
414 struct drbd_epoch_entry *e, *t;
415 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
416
417 spin_lock_irq(&mdev->req_lock);
418 reclaim_net_ee(mdev, &reclaimed);
419 list_splice_init(&mdev->done_ee, &work_list);
420 spin_unlock_irq(&mdev->req_lock);
421
422 list_for_each_entry_safe(e, t, &reclaimed, w.list)
423 drbd_free_ee(mdev, e);
424
425 /* possible callbacks here:
426 * e_end_block, and e_end_resync_block, e_send_discard_ack.
427 * all ignore the last argument.
428 */
429 list_for_each_entry_safe(e, t, &work_list, w.list) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700430 /* list_del not necessary, next/prev members not touched */
431 ok = e->w.cb(mdev, &e->w, !ok) && ok;
432 drbd_free_ee(mdev, e);
433 }
434 wake_up(&mdev->ee_wait);
435
436 return ok;
437}
438
439void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
440{
441 DEFINE_WAIT(wait);
442
443 /* avoids spin_lock/unlock
444 * and calling prepare_to_wait in the fast path */
445 while (!list_empty(head)) {
446 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
447 spin_unlock_irq(&mdev->req_lock);
448 drbd_kick_lo(mdev);
449 schedule();
450 finish_wait(&mdev->ee_wait, &wait);
451 spin_lock_irq(&mdev->req_lock);
452 }
453}
454
455void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456{
457 spin_lock_irq(&mdev->req_lock);
458 _drbd_wait_ee_list_empty(mdev, head);
459 spin_unlock_irq(&mdev->req_lock);
460}
461
462/* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
464static int drbd_accept(struct drbd_conf *mdev, const char **what,
465 struct socket *sock, struct socket **newsock)
466{
467 struct sock *sk = sock->sk;
468 int err = 0;
469
470 *what = "listen";
471 err = sock->ops->listen(sock, 5);
472 if (err < 0)
473 goto out;
474
475 *what = "sock_create_lite";
476 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
477 newsock);
478 if (err < 0)
479 goto out;
480
481 *what = "accept";
482 err = sock->ops->accept(sock, *newsock, 0);
483 if (err < 0) {
484 sock_release(*newsock);
485 *newsock = NULL;
486 goto out;
487 }
488 (*newsock)->ops = sock->ops;
489
490out:
491 return err;
492}
493
494static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
495 void *buf, size_t size, int flags)
496{
497 mm_segment_t oldfs;
498 struct kvec iov = {
499 .iov_base = buf,
500 .iov_len = size,
501 };
502 struct msghdr msg = {
503 .msg_iovlen = 1,
504 .msg_iov = (struct iovec *)&iov,
505 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506 };
507 int rv;
508
509 oldfs = get_fs();
510 set_fs(KERNEL_DS);
511 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512 set_fs(oldfs);
513
514 return rv;
515}
516
517static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
518{
519 mm_segment_t oldfs;
520 struct kvec iov = {
521 .iov_base = buf,
522 .iov_len = size,
523 };
524 struct msghdr msg = {
525 .msg_iovlen = 1,
526 .msg_iov = (struct iovec *)&iov,
527 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528 };
529 int rv;
530
531 oldfs = get_fs();
532 set_fs(KERNEL_DS);
533
534 for (;;) {
535 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
536 if (rv == size)
537 break;
538
539 /* Note:
540 * ECONNRESET other side closed the connection
541 * ERESTARTSYS (on sock) we got a signal
542 */
543
544 if (rv < 0) {
545 if (rv == -ECONNRESET)
546 dev_info(DEV, "sock was reset by peer\n");
547 else if (rv != -ERESTARTSYS)
548 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
549 break;
550 } else if (rv == 0) {
551 dev_info(DEV, "sock was shut down by peer\n");
552 break;
553 } else {
554 /* signal came in, or peer/link went down,
555 * after we read a partial message
556 */
557 /* D_ASSERT(signal_pending(current)); */
558 break;
559 }
560 };
561
562 set_fs(oldfs);
563
564 if (rv != size)
565 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
566
567 return rv;
568}
569
570static struct socket *drbd_try_connect(struct drbd_conf *mdev)
571{
572 const char *what;
573 struct socket *sock;
574 struct sockaddr_in6 src_in6;
575 int err;
576 int disconnect_on_error = 1;
577
578 if (!get_net_conf(mdev))
579 return NULL;
580
581 what = "sock_create_kern";
582 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
583 SOCK_STREAM, IPPROTO_TCP, &sock);
584 if (err < 0) {
585 sock = NULL;
586 goto out;
587 }
588
589 sock->sk->sk_rcvtimeo =
590 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
591
592 /* explicitly bind to the configured IP as source IP
593 * for the outgoing connections.
594 * This is needed for multihomed hosts and to be
595 * able to use lo: interfaces for drbd.
596 * Make sure to use 0 as port number, so linux selects
597 * a free one dynamically.
598 */
599 memcpy(&src_in6, mdev->net_conf->my_addr,
600 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
601 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
602 src_in6.sin6_port = 0;
603 else
604 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
605
606 what = "bind before connect";
607 err = sock->ops->bind(sock,
608 (struct sockaddr *) &src_in6,
609 mdev->net_conf->my_addr_len);
610 if (err < 0)
611 goto out;
612
613 /* connect may fail, peer not yet available.
614 * stay C_WF_CONNECTION, don't go Disconnecting! */
615 disconnect_on_error = 0;
616 what = "connect";
617 err = sock->ops->connect(sock,
618 (struct sockaddr *)mdev->net_conf->peer_addr,
619 mdev->net_conf->peer_addr_len, 0);
620
621out:
622 if (err < 0) {
623 if (sock) {
624 sock_release(sock);
625 sock = NULL;
626 }
627 switch (-err) {
628 /* timeout, busy, signal pending */
629 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
630 case EINTR: case ERESTARTSYS:
631 /* peer not (yet) available, network problem */
632 case ECONNREFUSED: case ENETUNREACH:
633 case EHOSTDOWN: case EHOSTUNREACH:
634 disconnect_on_error = 0;
635 break;
636 default:
637 dev_err(DEV, "%s failed, err = %d\n", what, err);
638 }
639 if (disconnect_on_error)
640 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
641 }
642 put_net_conf(mdev);
643 return sock;
644}
645
646static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
647{
648 int timeo, err;
649 struct socket *s_estab = NULL, *s_listen;
650 const char *what;
651
652 if (!get_net_conf(mdev))
653 return NULL;
654
655 what = "sock_create_kern";
656 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
657 SOCK_STREAM, IPPROTO_TCP, &s_listen);
658 if (err) {
659 s_listen = NULL;
660 goto out;
661 }
662
663 timeo = mdev->net_conf->try_connect_int * HZ;
664 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
665
666 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
667 s_listen->sk->sk_rcvtimeo = timeo;
668 s_listen->sk->sk_sndtimeo = timeo;
669
670 what = "bind before listen";
671 err = s_listen->ops->bind(s_listen,
672 (struct sockaddr *) mdev->net_conf->my_addr,
673 mdev->net_conf->my_addr_len);
674 if (err < 0)
675 goto out;
676
677 err = drbd_accept(mdev, &what, s_listen, &s_estab);
678
679out:
680 if (s_listen)
681 sock_release(s_listen);
682 if (err < 0) {
683 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
684 dev_err(DEV, "%s failed, err = %d\n", what, err);
685 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
686 }
687 }
688 put_net_conf(mdev);
689
690 return s_estab;
691}
692
693static int drbd_send_fp(struct drbd_conf *mdev,
694 struct socket *sock, enum drbd_packets cmd)
695{
696 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
697
698 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
699}
700
701static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
702{
703 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
704 int rr;
705
706 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
707
708 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
709 return be16_to_cpu(h->command);
710
711 return 0xffff;
712}
713
714/**
715 * drbd_socket_okay() - Free the socket if its connection is not okay
716 * @mdev: DRBD device.
717 * @sock: pointer to the pointer to the socket.
718 */
719static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
720{
721 int rr;
722 char tb[4];
723
724 if (!*sock)
725 return FALSE;
726
727 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
728
729 if (rr > 0 || rr == -EAGAIN) {
730 return TRUE;
731 } else {
732 sock_release(*sock);
733 *sock = NULL;
734 return FALSE;
735 }
736}
737
738/*
739 * return values:
740 * 1 yes, we have a valid connection
741 * 0 oops, did not work out, please try again
742 * -1 peer talks different language,
743 * no point in trying again, please go standalone.
744 * -2 We do not have a network config...
745 */
746static int drbd_connect(struct drbd_conf *mdev)
747{
748 struct socket *s, *sock, *msock;
749 int try, h, ok;
750
751 D_ASSERT(!mdev->data.socket);
752
753 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
754 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
755
756 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757 return -2;
758
759 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760
761 sock = NULL;
762 msock = NULL;
763
764 do {
765 for (try = 0;;) {
766 /* 3 tries, this should take less than a second! */
767 s = drbd_try_connect(mdev);
768 if (s || ++try >= 3)
769 break;
770 /* give the other side time to call bind() & listen() */
771 __set_current_state(TASK_INTERRUPTIBLE);
772 schedule_timeout(HZ / 10);
773 }
774
775 if (s) {
776 if (!sock) {
777 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
778 sock = s;
779 s = NULL;
780 } else if (!msock) {
781 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
782 msock = s;
783 s = NULL;
784 } else {
785 dev_err(DEV, "Logic error in drbd_connect()\n");
786 goto out_release_sockets;
787 }
788 }
789
790 if (sock && msock) {
791 __set_current_state(TASK_INTERRUPTIBLE);
792 schedule_timeout(HZ / 10);
793 ok = drbd_socket_okay(mdev, &sock);
794 ok = drbd_socket_okay(mdev, &msock) && ok;
795 if (ok)
796 break;
797 }
798
799retry:
800 s = drbd_wait_for_connect(mdev);
801 if (s) {
802 try = drbd_recv_fp(mdev, s);
803 drbd_socket_okay(mdev, &sock);
804 drbd_socket_okay(mdev, &msock);
805 switch (try) {
806 case P_HAND_SHAKE_S:
807 if (sock) {
808 dev_warn(DEV, "initial packet S crossed\n");
809 sock_release(sock);
810 }
811 sock = s;
812 break;
813 case P_HAND_SHAKE_M:
814 if (msock) {
815 dev_warn(DEV, "initial packet M crossed\n");
816 sock_release(msock);
817 }
818 msock = s;
819 set_bit(DISCARD_CONCURRENT, &mdev->flags);
820 break;
821 default:
822 dev_warn(DEV, "Error receiving initial packet\n");
823 sock_release(s);
824 if (random32() & 1)
825 goto retry;
826 }
827 }
828
829 if (mdev->state.conn <= C_DISCONNECTING)
830 goto out_release_sockets;
831 if (signal_pending(current)) {
832 flush_signals(current);
833 smp_rmb();
834 if (get_t_state(&mdev->receiver) == Exiting)
835 goto out_release_sockets;
836 }
837
838 if (sock && msock) {
839 ok = drbd_socket_okay(mdev, &sock);
840 ok = drbd_socket_okay(mdev, &msock) && ok;
841 if (ok)
842 break;
843 }
844 } while (1);
845
846 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
847 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
848
849 sock->sk->sk_allocation = GFP_NOIO;
850 msock->sk->sk_allocation = GFP_NOIO;
851
852 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
853 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
854
855 if (mdev->net_conf->sndbuf_size) {
856 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
857 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
858 }
859
860 if (mdev->net_conf->rcvbuf_size) {
861 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
862 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
863 }
864
865 /* NOT YET ...
866 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
867 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
868 * first set it to the P_HAND_SHAKE timeout,
869 * which we set to 4x the configured ping_timeout. */
870 sock->sk->sk_sndtimeo =
871 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
872
873 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
874 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
875
876 /* we don't want delays.
877 * we use TCP_CORK where apropriate, though */
878 drbd_tcp_nodelay(sock);
879 drbd_tcp_nodelay(msock);
880
881 mdev->data.socket = sock;
882 mdev->meta.socket = msock;
883 mdev->last_received = jiffies;
884
885 D_ASSERT(mdev->asender.task == NULL);
886
887 h = drbd_do_handshake(mdev);
888 if (h <= 0)
889 return h;
890
891 if (mdev->cram_hmac_tfm) {
892 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
Johannes Thomab10d96c2010-01-07 16:02:50 +0100893 switch (drbd_do_auth(mdev)) {
894 case -1:
Philipp Reisnerb411b362009-09-25 16:07:19 -0700895 dev_err(DEV, "Authentication of peer failed\n");
896 return -1;
Johannes Thomab10d96c2010-01-07 16:02:50 +0100897 case 0:
898 dev_err(DEV, "Authentication of peer failed, trying again.\n");
899 return 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700900 }
901 }
902
903 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
904 return 0;
905
906 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
907 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
908
909 atomic_set(&mdev->packet_seq, 0);
910 mdev->peer_seq = 0;
911
912 drbd_thread_start(&mdev->asender);
913
Philipp Reisner7e2455c2010-04-22 14:50:23 +0200914 if (!drbd_send_protocol(mdev))
915 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700916 drbd_send_sync_param(mdev, &mdev->sync_conf);
Philipp Reisnere89b5912010-03-24 17:11:33 +0100917 drbd_send_sizes(mdev, 0, 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700918 drbd_send_uuids(mdev);
919 drbd_send_state(mdev);
920 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
921 clear_bit(RESIZE_PENDING, &mdev->flags);
922
923 return 1;
924
925out_release_sockets:
926 if (sock)
927 sock_release(sock);
928 if (msock)
929 sock_release(msock);
930 return -1;
931}
932
933static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
934{
935 int r;
936
937 r = drbd_recv(mdev, h, sizeof(*h));
938
939 if (unlikely(r != sizeof(*h))) {
940 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
941 return FALSE;
942 };
943 h->command = be16_to_cpu(h->command);
944 h->length = be16_to_cpu(h->length);
945 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
946 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
947 (long)be32_to_cpu(h->magic),
948 h->command, h->length);
949 return FALSE;
950 }
951 mdev->last_received = jiffies;
952
953 return TRUE;
954}
955
956static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
957{
958 int rv;
959
960 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
Dmitry Monakhovfbd9b092010-04-28 17:55:06 +0400961 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
962 NULL, BLKDEV_IFL_WAIT);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700963 if (rv) {
964 dev_err(DEV, "local disk flush failed with status %d\n", rv);
965 /* would rather check on EOPNOTSUPP, but that is not reliable.
966 * don't try again for ANY return value != 0
967 * if (rv == -EOPNOTSUPP) */
968 drbd_bump_write_ordering(mdev, WO_drain_io);
969 }
970 put_ldev(mdev);
971 }
972
973 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
974}
975
976static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
977{
978 struct flush_work *fw = (struct flush_work *)w;
979 struct drbd_epoch *epoch = fw->epoch;
980
981 kfree(w);
982
983 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
984 drbd_flush_after_epoch(mdev, epoch);
985
986 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
987 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
988
989 return 1;
990}
991
992/**
993 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
994 * @mdev: DRBD device.
995 * @epoch: Epoch object.
996 * @ev: Epoch event.
997 */
998static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
999 struct drbd_epoch *epoch,
1000 enum epoch_event ev)
1001{
1002 int finish, epoch_size;
1003 struct drbd_epoch *next_epoch;
1004 int schedule_flush = 0;
1005 enum finish_epoch rv = FE_STILL_LIVE;
1006
1007 spin_lock(&mdev->epoch_lock);
1008 do {
1009 next_epoch = NULL;
1010 finish = 0;
1011
1012 epoch_size = atomic_read(&epoch->epoch_size);
1013
1014 switch (ev & ~EV_CLEANUP) {
1015 case EV_PUT:
1016 atomic_dec(&epoch->active);
1017 break;
1018 case EV_GOT_BARRIER_NR:
1019 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1020
1021 /* Special case: If we just switched from WO_bio_barrier to
1022 WO_bdev_flush we should not finish the current epoch */
1023 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1024 mdev->write_ordering != WO_bio_barrier &&
1025 epoch == mdev->current_epoch)
1026 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1027 break;
1028 case EV_BARRIER_DONE:
1029 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1030 break;
1031 case EV_BECAME_LAST:
1032 /* nothing to do*/
1033 break;
1034 }
1035
Philipp Reisnerb411b362009-09-25 16:07:19 -07001036 if (epoch_size != 0 &&
1037 atomic_read(&epoch->active) == 0 &&
1038 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1039 epoch->list.prev == &mdev->current_epoch->list &&
1040 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1041 /* Nearly all conditions are met to finish that epoch... */
1042 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1043 mdev->write_ordering == WO_none ||
1044 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1045 ev & EV_CLEANUP) {
1046 finish = 1;
1047 set_bit(DE_IS_FINISHING, &epoch->flags);
1048 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1049 mdev->write_ordering == WO_bio_barrier) {
1050 atomic_inc(&epoch->active);
1051 schedule_flush = 1;
1052 }
1053 }
1054 if (finish) {
1055 if (!(ev & EV_CLEANUP)) {
1056 spin_unlock(&mdev->epoch_lock);
1057 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1058 spin_lock(&mdev->epoch_lock);
1059 }
1060 dec_unacked(mdev);
1061
1062 if (mdev->current_epoch != epoch) {
1063 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1064 list_del(&epoch->list);
1065 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1066 mdev->epochs--;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001067 kfree(epoch);
1068
1069 if (rv == FE_STILL_LIVE)
1070 rv = FE_DESTROYED;
1071 } else {
1072 epoch->flags = 0;
1073 atomic_set(&epoch->epoch_size, 0);
1074 /* atomic_set(&epoch->active, 0); is alrady zero */
1075 if (rv == FE_STILL_LIVE)
1076 rv = FE_RECYCLED;
1077 }
1078 }
1079
1080 if (!next_epoch)
1081 break;
1082
1083 epoch = next_epoch;
1084 } while (1);
1085
1086 spin_unlock(&mdev->epoch_lock);
1087
1088 if (schedule_flush) {
1089 struct flush_work *fw;
1090 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1091 if (fw) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001092 fw->w.cb = w_flush;
1093 fw->epoch = epoch;
1094 drbd_queue_work(&mdev->data.work, &fw->w);
1095 } else {
1096 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1097 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1098 /* That is not a recursion, only one level */
1099 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1100 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1101 }
1102 }
1103
1104 return rv;
1105}
1106
1107/**
1108 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1109 * @mdev: DRBD device.
1110 * @wo: Write ordering method to try.
1111 */
1112void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1113{
1114 enum write_ordering_e pwo;
1115 static char *write_ordering_str[] = {
1116 [WO_none] = "none",
1117 [WO_drain_io] = "drain",
1118 [WO_bdev_flush] = "flush",
1119 [WO_bio_barrier] = "barrier",
1120 };
1121
1122 pwo = mdev->write_ordering;
1123 wo = min(pwo, wo);
1124 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1125 wo = WO_bdev_flush;
1126 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1127 wo = WO_drain_io;
1128 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1129 wo = WO_none;
1130 mdev->write_ordering = wo;
1131 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1132 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1133}
1134
1135/**
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001136 * drbd_submit_ee()
1137 * @mdev: DRBD device.
1138 * @e: epoch entry
1139 * @rw: flag field, see bio->bi_rw
1140 */
1141/* TODO allocate from our own bio_set. */
1142int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1143 const unsigned rw, const int fault_type)
1144{
1145 struct bio *bios = NULL;
1146 struct bio *bio;
1147 struct page *page = e->pages;
1148 sector_t sector = e->sector;
1149 unsigned ds = e->size;
1150 unsigned n_bios = 0;
1151 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152
1153 /* In most cases, we will only need one bio. But in case the lower
1154 * level restrictions happen to be different at this offset on this
1155 * side than those of the sending peer, we may need to submit the
1156 * request in more than one bio. */
1157next_bio:
1158 bio = bio_alloc(GFP_NOIO, nr_pages);
1159 if (!bio) {
1160 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1161 goto fail;
1162 }
1163 /* > e->sector, unless this is the first bio */
1164 bio->bi_sector = sector;
1165 bio->bi_bdev = mdev->ldev->backing_bdev;
1166 /* we special case some flags in the multi-bio case, see below
1167 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1168 bio->bi_rw = rw;
1169 bio->bi_private = e;
1170 bio->bi_end_io = drbd_endio_sec;
1171
1172 bio->bi_next = bios;
1173 bios = bio;
1174 ++n_bios;
1175
1176 page_chain_for_each(page) {
1177 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1178 if (!bio_add_page(bio, page, len, 0)) {
1179 /* a single page must always be possible! */
1180 BUG_ON(bio->bi_vcnt == 0);
1181 goto next_bio;
1182 }
1183 ds -= len;
1184 sector += len >> 9;
1185 --nr_pages;
1186 }
1187 D_ASSERT(page == NULL);
1188 D_ASSERT(ds == 0);
1189
1190 atomic_set(&e->pending_bios, n_bios);
1191 do {
1192 bio = bios;
1193 bios = bios->bi_next;
1194 bio->bi_next = NULL;
1195
1196 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1197 if (bios)
1198 bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1199
1200 drbd_generic_make_request(mdev, fault_type, bio);
1201
1202 /* strip off BIO_RW_BARRIER,
1203 * unless it is the first or last bio */
1204 if (bios && bios->bi_next)
1205 bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1206 } while (bios);
1207 maybe_kick_lo(mdev);
1208 return 0;
1209
1210fail:
1211 while (bios) {
1212 bio = bios;
1213 bios = bios->bi_next;
1214 bio_put(bio);
1215 }
1216 return -ENOMEM;
1217}
1218
1219/**
Philipp Reisnerb411b362009-09-25 16:07:19 -07001220 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1221 * @mdev: DRBD device.
1222 * @w: work object.
1223 * @cancel: The connection will be closed anyways (unused in this callback)
1224 */
1225int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1226{
1227 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001228 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1229 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1230 so that we can finish that epoch in drbd_may_finish_epoch().
1231 That is necessary if we already have a long chain of Epochs, before
1232 we realize that BIO_RW_BARRIER is actually not supported */
1233
1234 /* As long as the -ENOTSUPP on the barrier is reported immediately
1235 that will never trigger. If it is reported late, we will just
1236 print that warning and continue correctly for all future requests
1237 with WO_bdev_flush */
1238 if (previous_epoch(mdev, e->epoch))
1239 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1240
Philipp Reisnerb411b362009-09-25 16:07:19 -07001241 /* we still have a local reference,
1242 * get_ldev was done in receive_Data. */
Philipp Reisnerb411b362009-09-25 16:07:19 -07001243
1244 e->w.cb = e_end_block;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001245 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1246 /* drbd_submit_ee fails for one reason only:
1247 * if was not able to allocate sufficient bios.
1248 * requeue, try again later. */
1249 e->w.cb = w_e_reissue;
1250 drbd_queue_work(&mdev->data.work, &e->w);
1251 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001252 return 1;
1253}
1254
1255static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1256{
1257 int rv, issue_flush;
1258 struct p_barrier *p = (struct p_barrier *)h;
1259 struct drbd_epoch *epoch;
1260
1261 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1262
1263 rv = drbd_recv(mdev, h->payload, h->length);
1264 ERR_IF(rv != h->length) return FALSE;
1265
1266 inc_unacked(mdev);
1267
1268 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1269 drbd_kick_lo(mdev);
1270
1271 mdev->current_epoch->barrier_nr = p->barrier;
1272 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1273
1274 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1275 * the activity log, which means it would not be resynced in case the
1276 * R_PRIMARY crashes now.
1277 * Therefore we must send the barrier_ack after the barrier request was
1278 * completed. */
1279 switch (mdev->write_ordering) {
1280 case WO_bio_barrier:
1281 case WO_none:
1282 if (rv == FE_RECYCLED)
1283 return TRUE;
1284 break;
1285
1286 case WO_bdev_flush:
1287 case WO_drain_io:
Philipp Reisner367a8d72009-12-29 15:56:01 +01001288 if (rv == FE_STILL_LIVE) {
1289 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1290 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1291 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1292 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001293 if (rv == FE_RECYCLED)
1294 return TRUE;
1295
1296 /* The asender will send all the ACKs and barrier ACKs out, since
1297 all EEs moved from the active_ee to the done_ee. We need to
1298 provide a new epoch object for the EEs that come in soon */
1299 break;
1300 }
1301
1302 /* receiver context, in the writeout path of the other node.
1303 * avoid potential distributed deadlock */
1304 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1305 if (!epoch) {
1306 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
Dan Carpenterd3db7b42010-01-23 15:45:22 +03001307 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001308 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1309 if (issue_flush) {
1310 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1311 if (rv == FE_RECYCLED)
1312 return TRUE;
1313 }
1314
1315 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1316
1317 return TRUE;
1318 }
1319
1320 epoch->flags = 0;
1321 atomic_set(&epoch->epoch_size, 0);
1322 atomic_set(&epoch->active, 0);
1323
1324 spin_lock(&mdev->epoch_lock);
1325 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1326 list_add(&epoch->list, &mdev->current_epoch->list);
1327 mdev->current_epoch = epoch;
1328 mdev->epochs++;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001329 } else {
1330 /* The current_epoch got recycled while we allocated this one... */
1331 kfree(epoch);
1332 }
1333 spin_unlock(&mdev->epoch_lock);
1334
1335 return TRUE;
1336}
1337
1338/* used from receive_RSDataReply (recv_resync_read)
1339 * and from receive_Data */
1340static struct drbd_epoch_entry *
1341read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1342{
Lars Ellenberg66660322010-04-06 12:15:04 +02001343 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001344 struct drbd_epoch_entry *e;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001345 struct page *page;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001346 int dgs, ds, rr;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001347 void *dig_in = mdev->int_dig_in;
1348 void *dig_vv = mdev->int_dig_vv;
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001349 unsigned long *data;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001350
1351 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1352 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1353
1354 if (dgs) {
1355 rr = drbd_recv(mdev, dig_in, dgs);
1356 if (rr != dgs) {
1357 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1358 rr, dgs);
1359 return NULL;
1360 }
1361 }
1362
1363 data_size -= dgs;
1364
1365 ERR_IF(data_size & 0x1ff) return NULL;
1366 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1367
Lars Ellenberg66660322010-04-06 12:15:04 +02001368 /* even though we trust out peer,
1369 * we sometimes have to double check. */
1370 if (sector + (data_size>>9) > capacity) {
1371 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1372 (unsigned long long)capacity,
1373 (unsigned long long)sector, data_size);
1374 return NULL;
1375 }
1376
Philipp Reisnerb411b362009-09-25 16:07:19 -07001377 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1378 * "criss-cross" setup, that might cause write-out on some other DRBD,
1379 * which in turn might block on the other node at this very place. */
1380 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1381 if (!e)
1382 return NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001383
Philipp Reisnerb411b362009-09-25 16:07:19 -07001384 ds = data_size;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001385 page = e->pages;
1386 page_chain_for_each(page) {
1387 unsigned len = min_t(int, ds, PAGE_SIZE);
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001388 data = kmap(page);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001389 rr = drbd_recv(mdev, data, len);
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001390 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1391 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1392 data[0] = data[0] ^ (unsigned long)-1;
1393 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001394 kunmap(page);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001395 if (rr != len) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001396 drbd_free_ee(mdev, e);
1397 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001398 rr, len);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001399 return NULL;
1400 }
1401 ds -= rr;
1402 }
1403
1404 if (dgs) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001405 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001406 if (memcmp(dig_in, dig_vv, dgs)) {
1407 dev_err(DEV, "Digest integrity check FAILED.\n");
1408 drbd_bcast_ee(mdev, "digest failed",
1409 dgs, dig_in, dig_vv, e);
1410 drbd_free_ee(mdev, e);
1411 return NULL;
1412 }
1413 }
1414 mdev->recv_cnt += data_size>>9;
1415 return e;
1416}
1417
1418/* drbd_drain_block() just takes a data block
1419 * out of the socket input buffer, and discards it.
1420 */
1421static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1422{
1423 struct page *page;
1424 int rr, rv = 1;
1425 void *data;
1426
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02001427 if (!data_size)
1428 return TRUE;
1429
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001430 page = drbd_pp_alloc(mdev, 1, 1);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001431
1432 data = kmap(page);
1433 while (data_size) {
1434 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1435 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1436 rv = 0;
1437 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1438 rr, min_t(int, data_size, PAGE_SIZE));
1439 break;
1440 }
1441 data_size -= rr;
1442 }
1443 kunmap(page);
1444 drbd_pp_free(mdev, page);
1445 return rv;
1446}
1447
1448static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1449 sector_t sector, int data_size)
1450{
1451 struct bio_vec *bvec;
1452 struct bio *bio;
1453 int dgs, rr, i, expect;
1454 void *dig_in = mdev->int_dig_in;
1455 void *dig_vv = mdev->int_dig_vv;
1456
1457 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1458 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1459
1460 if (dgs) {
1461 rr = drbd_recv(mdev, dig_in, dgs);
1462 if (rr != dgs) {
1463 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1464 rr, dgs);
1465 return 0;
1466 }
1467 }
1468
1469 data_size -= dgs;
1470
1471 /* optimistically update recv_cnt. if receiving fails below,
1472 * we disconnect anyways, and counters will be reset. */
1473 mdev->recv_cnt += data_size>>9;
1474
1475 bio = req->master_bio;
1476 D_ASSERT(sector == bio->bi_sector);
1477
1478 bio_for_each_segment(bvec, bio, i) {
1479 expect = min_t(int, data_size, bvec->bv_len);
1480 rr = drbd_recv(mdev,
1481 kmap(bvec->bv_page)+bvec->bv_offset,
1482 expect);
1483 kunmap(bvec->bv_page);
1484 if (rr != expect) {
1485 dev_warn(DEV, "short read receiving data reply: "
1486 "read %d expected %d\n",
1487 rr, expect);
1488 return 0;
1489 }
1490 data_size -= rr;
1491 }
1492
1493 if (dgs) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001494 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001495 if (memcmp(dig_in, dig_vv, dgs)) {
1496 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1497 return 0;
1498 }
1499 }
1500
1501 D_ASSERT(data_size == 0);
1502 return 1;
1503}
1504
1505/* e_end_resync_block() is called via
1506 * drbd_process_done_ee() by asender only */
1507static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1508{
1509 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1510 sector_t sector = e->sector;
1511 int ok;
1512
1513 D_ASSERT(hlist_unhashed(&e->colision));
1514
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001515 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001516 drbd_set_in_sync(mdev, sector, e->size);
1517 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1518 } else {
1519 /* Record failure to sync */
1520 drbd_rs_failed_io(mdev, sector, e->size);
1521
1522 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1523 }
1524 dec_unacked(mdev);
1525
1526 return ok;
1527}
1528
1529static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1530{
1531 struct drbd_epoch_entry *e;
1532
1533 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001534 if (!e)
1535 goto fail;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001536
1537 dec_rs_pending(mdev);
1538
Philipp Reisnerb411b362009-09-25 16:07:19 -07001539 inc_unacked(mdev);
1540 /* corresponding dec_unacked() in e_end_resync_block()
1541 * respective _drbd_clear_done_ee */
1542
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001543 e->w.cb = e_end_resync_block;
1544
Philipp Reisnerb411b362009-09-25 16:07:19 -07001545 spin_lock_irq(&mdev->req_lock);
1546 list_add(&e->w.list, &mdev->sync_ee);
1547 spin_unlock_irq(&mdev->req_lock);
1548
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001549 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1550 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001551
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001552 drbd_free_ee(mdev, e);
1553fail:
1554 put_ldev(mdev);
1555 return FALSE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001556}
1557
1558static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1559{
1560 struct drbd_request *req;
1561 sector_t sector;
1562 unsigned int header_size, data_size;
1563 int ok;
1564 struct p_data *p = (struct p_data *)h;
1565
1566 header_size = sizeof(*p) - sizeof(*h);
1567 data_size = h->length - header_size;
1568
1569 ERR_IF(data_size == 0) return FALSE;
1570
1571 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1572 return FALSE;
1573
1574 sector = be64_to_cpu(p->sector);
1575
1576 spin_lock_irq(&mdev->req_lock);
1577 req = _ar_id_to_req(mdev, p->block_id, sector);
1578 spin_unlock_irq(&mdev->req_lock);
1579 if (unlikely(!req)) {
1580 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1581 return FALSE;
1582 }
1583
1584 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1585 * special casing it there for the various failure cases.
1586 * still no race with drbd_fail_pending_reads */
1587 ok = recv_dless_read(mdev, req, sector, data_size);
1588
1589 if (ok)
1590 req_mod(req, data_received);
1591 /* else: nothing. handled from drbd_disconnect...
1592 * I don't think we may complete this just yet
1593 * in case we are "on-disconnect: freeze" */
1594
1595 return ok;
1596}
1597
1598static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1599{
1600 sector_t sector;
1601 unsigned int header_size, data_size;
1602 int ok;
1603 struct p_data *p = (struct p_data *)h;
1604
1605 header_size = sizeof(*p) - sizeof(*h);
1606 data_size = h->length - header_size;
1607
1608 ERR_IF(data_size == 0) return FALSE;
1609
1610 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1611 return FALSE;
1612
1613 sector = be64_to_cpu(p->sector);
1614 D_ASSERT(p->block_id == ID_SYNCER);
1615
1616 if (get_ldev(mdev)) {
1617 /* data is submitted to disk within recv_resync_read.
1618 * corresponding put_ldev done below on error,
1619 * or in drbd_endio_write_sec. */
1620 ok = recv_resync_read(mdev, sector, data_size);
1621 } else {
1622 if (__ratelimit(&drbd_ratelimit_state))
1623 dev_err(DEV, "Can not write resync data to local disk.\n");
1624
1625 ok = drbd_drain_block(mdev, data_size);
1626
1627 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1628 }
1629
1630 return ok;
1631}
1632
1633/* e_end_block() is called via drbd_process_done_ee().
1634 * this means this function only runs in the asender thread
1635 */
1636static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1637{
1638 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1639 sector_t sector = e->sector;
1640 struct drbd_epoch *epoch;
1641 int ok = 1, pcmd;
1642
1643 if (e->flags & EE_IS_BARRIER) {
1644 epoch = previous_epoch(mdev, e->epoch);
1645 if (epoch)
1646 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1647 }
1648
1649 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001650 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001651 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1652 mdev->state.conn <= C_PAUSED_SYNC_T &&
1653 e->flags & EE_MAY_SET_IN_SYNC) ?
1654 P_RS_WRITE_ACK : P_WRITE_ACK;
1655 ok &= drbd_send_ack(mdev, pcmd, e);
1656 if (pcmd == P_RS_WRITE_ACK)
1657 drbd_set_in_sync(mdev, sector, e->size);
1658 } else {
1659 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1660 /* we expect it to be marked out of sync anyways...
1661 * maybe assert this? */
1662 }
1663 dec_unacked(mdev);
1664 }
1665 /* we delete from the conflict detection hash _after_ we sent out the
1666 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1667 if (mdev->net_conf->two_primaries) {
1668 spin_lock_irq(&mdev->req_lock);
1669 D_ASSERT(!hlist_unhashed(&e->colision));
1670 hlist_del_init(&e->colision);
1671 spin_unlock_irq(&mdev->req_lock);
1672 } else {
1673 D_ASSERT(hlist_unhashed(&e->colision));
1674 }
1675
1676 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1677
1678 return ok;
1679}
1680
1681static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1682{
1683 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1684 int ok = 1;
1685
1686 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1687 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1688
1689 spin_lock_irq(&mdev->req_lock);
1690 D_ASSERT(!hlist_unhashed(&e->colision));
1691 hlist_del_init(&e->colision);
1692 spin_unlock_irq(&mdev->req_lock);
1693
1694 dec_unacked(mdev);
1695
1696 return ok;
1697}
1698
1699/* Called from receive_Data.
1700 * Synchronize packets on sock with packets on msock.
1701 *
1702 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1703 * packet traveling on msock, they are still processed in the order they have
1704 * been sent.
1705 *
1706 * Note: we don't care for Ack packets overtaking P_DATA packets.
1707 *
1708 * In case packet_seq is larger than mdev->peer_seq number, there are
1709 * outstanding packets on the msock. We wait for them to arrive.
1710 * In case we are the logically next packet, we update mdev->peer_seq
1711 * ourselves. Correctly handles 32bit wrap around.
1712 *
1713 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1714 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1715 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1716 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1717 *
1718 * returns 0 if we may process the packet,
1719 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1720static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1721{
1722 DEFINE_WAIT(wait);
1723 unsigned int p_seq;
1724 long timeout;
1725 int ret = 0;
1726 spin_lock(&mdev->peer_seq_lock);
1727 for (;;) {
1728 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1729 if (seq_le(packet_seq, mdev->peer_seq+1))
1730 break;
1731 if (signal_pending(current)) {
1732 ret = -ERESTARTSYS;
1733 break;
1734 }
1735 p_seq = mdev->peer_seq;
1736 spin_unlock(&mdev->peer_seq_lock);
1737 timeout = schedule_timeout(30*HZ);
1738 spin_lock(&mdev->peer_seq_lock);
1739 if (timeout == 0 && p_seq == mdev->peer_seq) {
1740 ret = -ETIMEDOUT;
1741 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1742 break;
1743 }
1744 }
1745 finish_wait(&mdev->seq_wait, &wait);
1746 if (mdev->peer_seq+1 == packet_seq)
1747 mdev->peer_seq++;
1748 spin_unlock(&mdev->peer_seq_lock);
1749 return ret;
1750}
1751
1752/* mirrored write */
1753static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1754{
1755 sector_t sector;
1756 struct drbd_epoch_entry *e;
1757 struct p_data *p = (struct p_data *)h;
1758 int header_size, data_size;
1759 int rw = WRITE;
1760 u32 dp_flags;
1761
1762 header_size = sizeof(*p) - sizeof(*h);
1763 data_size = h->length - header_size;
1764
1765 ERR_IF(data_size == 0) return FALSE;
1766
1767 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1768 return FALSE;
1769
1770 if (!get_ldev(mdev)) {
1771 if (__ratelimit(&drbd_ratelimit_state))
1772 dev_err(DEV, "Can not write mirrored data block "
1773 "to local disk.\n");
1774 spin_lock(&mdev->peer_seq_lock);
1775 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1776 mdev->peer_seq++;
1777 spin_unlock(&mdev->peer_seq_lock);
1778
1779 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1780 atomic_inc(&mdev->current_epoch->epoch_size);
1781 return drbd_drain_block(mdev, data_size);
1782 }
1783
1784 /* get_ldev(mdev) successful.
1785 * Corresponding put_ldev done either below (on various errors),
1786 * or in drbd_endio_write_sec, if we successfully submit the data at
1787 * the end of this function. */
1788
1789 sector = be64_to_cpu(p->sector);
1790 e = read_in_block(mdev, p->block_id, sector, data_size);
1791 if (!e) {
1792 put_ldev(mdev);
1793 return FALSE;
1794 }
1795
Philipp Reisnerb411b362009-09-25 16:07:19 -07001796 e->w.cb = e_end_block;
1797
1798 spin_lock(&mdev->epoch_lock);
1799 e->epoch = mdev->current_epoch;
1800 atomic_inc(&e->epoch->epoch_size);
1801 atomic_inc(&e->epoch->active);
1802
1803 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1804 struct drbd_epoch *epoch;
1805 /* Issue a barrier if we start a new epoch, and the previous epoch
1806 was not a epoch containing a single request which already was
1807 a Barrier. */
1808 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1809 if (epoch == e->epoch) {
1810 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001811 rw |= (1<<BIO_RW_BARRIER);
1812 e->flags |= EE_IS_BARRIER;
1813 } else {
1814 if (atomic_read(&epoch->epoch_size) > 1 ||
1815 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1816 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001817 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001818 rw |= (1<<BIO_RW_BARRIER);
1819 e->flags |= EE_IS_BARRIER;
1820 }
1821 }
1822 }
1823 spin_unlock(&mdev->epoch_lock);
1824
1825 dp_flags = be32_to_cpu(p->dp_flags);
1826 if (dp_flags & DP_HARDBARRIER) {
1827 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1828 /* rw |= (1<<BIO_RW_BARRIER); */
1829 }
1830 if (dp_flags & DP_RW_SYNC)
1831 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1832 if (dp_flags & DP_MAY_SET_IN_SYNC)
1833 e->flags |= EE_MAY_SET_IN_SYNC;
1834
1835 /* I'm the receiver, I do hold a net_cnt reference. */
1836 if (!mdev->net_conf->two_primaries) {
1837 spin_lock_irq(&mdev->req_lock);
1838 } else {
1839 /* don't get the req_lock yet,
1840 * we may sleep in drbd_wait_peer_seq */
1841 const int size = e->size;
1842 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1843 DEFINE_WAIT(wait);
1844 struct drbd_request *i;
1845 struct hlist_node *n;
1846 struct hlist_head *slot;
1847 int first;
1848
1849 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1850 BUG_ON(mdev->ee_hash == NULL);
1851 BUG_ON(mdev->tl_hash == NULL);
1852
1853 /* conflict detection and handling:
1854 * 1. wait on the sequence number,
1855 * in case this data packet overtook ACK packets.
1856 * 2. check our hash tables for conflicting requests.
1857 * we only need to walk the tl_hash, since an ee can not
1858 * have a conflict with an other ee: on the submitting
1859 * node, the corresponding req had already been conflicting,
1860 * and a conflicting req is never sent.
1861 *
1862 * Note: for two_primaries, we are protocol C,
1863 * so there cannot be any request that is DONE
1864 * but still on the transfer log.
1865 *
1866 * unconditionally add to the ee_hash.
1867 *
1868 * if no conflicting request is found:
1869 * submit.
1870 *
1871 * if any conflicting request is found
1872 * that has not yet been acked,
1873 * AND I have the "discard concurrent writes" flag:
1874 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1875 *
1876 * if any conflicting request is found:
1877 * block the receiver, waiting on misc_wait
1878 * until no more conflicting requests are there,
1879 * or we get interrupted (disconnect).
1880 *
1881 * we do not just write after local io completion of those
1882 * requests, but only after req is done completely, i.e.
1883 * we wait for the P_DISCARD_ACK to arrive!
1884 *
1885 * then proceed normally, i.e. submit.
1886 */
1887 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1888 goto out_interrupted;
1889
1890 spin_lock_irq(&mdev->req_lock);
1891
1892 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1893
1894#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1895 slot = tl_hash_slot(mdev, sector);
1896 first = 1;
1897 for (;;) {
1898 int have_unacked = 0;
1899 int have_conflict = 0;
1900 prepare_to_wait(&mdev->misc_wait, &wait,
1901 TASK_INTERRUPTIBLE);
1902 hlist_for_each_entry(i, n, slot, colision) {
1903 if (OVERLAPS) {
1904 /* only ALERT on first iteration,
1905 * we may be woken up early... */
1906 if (first)
1907 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1908 " new: %llus +%u; pending: %llus +%u\n",
1909 current->comm, current->pid,
1910 (unsigned long long)sector, size,
1911 (unsigned long long)i->sector, i->size);
1912 if (i->rq_state & RQ_NET_PENDING)
1913 ++have_unacked;
1914 ++have_conflict;
1915 }
1916 }
1917#undef OVERLAPS
1918 if (!have_conflict)
1919 break;
1920
1921 /* Discard Ack only for the _first_ iteration */
1922 if (first && discard && have_unacked) {
1923 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1924 (unsigned long long)sector);
1925 inc_unacked(mdev);
1926 e->w.cb = e_send_discard_ack;
1927 list_add_tail(&e->w.list, &mdev->done_ee);
1928
1929 spin_unlock_irq(&mdev->req_lock);
1930
1931 /* we could probably send that P_DISCARD_ACK ourselves,
1932 * but I don't like the receiver using the msock */
1933
1934 put_ldev(mdev);
1935 wake_asender(mdev);
1936 finish_wait(&mdev->misc_wait, &wait);
1937 return TRUE;
1938 }
1939
1940 if (signal_pending(current)) {
1941 hlist_del_init(&e->colision);
1942
1943 spin_unlock_irq(&mdev->req_lock);
1944
1945 finish_wait(&mdev->misc_wait, &wait);
1946 goto out_interrupted;
1947 }
1948
1949 spin_unlock_irq(&mdev->req_lock);
1950 if (first) {
1951 first = 0;
1952 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1953 "sec=%llus\n", (unsigned long long)sector);
1954 } else if (discard) {
1955 /* we had none on the first iteration.
1956 * there must be none now. */
1957 D_ASSERT(have_unacked == 0);
1958 }
1959 schedule();
1960 spin_lock_irq(&mdev->req_lock);
1961 }
1962 finish_wait(&mdev->misc_wait, &wait);
1963 }
1964
1965 list_add(&e->w.list, &mdev->active_ee);
1966 spin_unlock_irq(&mdev->req_lock);
1967
1968 switch (mdev->net_conf->wire_protocol) {
1969 case DRBD_PROT_C:
1970 inc_unacked(mdev);
1971 /* corresponding dec_unacked() in e_end_block()
1972 * respective _drbd_clear_done_ee */
1973 break;
1974 case DRBD_PROT_B:
1975 /* I really don't like it that the receiver thread
1976 * sends on the msock, but anyways */
1977 drbd_send_ack(mdev, P_RECV_ACK, e);
1978 break;
1979 case DRBD_PROT_A:
1980 /* nothing to do */
1981 break;
1982 }
1983
1984 if (mdev->state.pdsk == D_DISKLESS) {
1985 /* In case we have the only disk of the cluster, */
1986 drbd_set_out_of_sync(mdev, e->sector, e->size);
1987 e->flags |= EE_CALL_AL_COMPLETE_IO;
1988 drbd_al_begin_io(mdev, e->sector);
1989 }
1990
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001991 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1992 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001993
1994out_interrupted:
1995 /* yes, the epoch_size now is imbalanced.
1996 * but we drop the connection anyways, so we don't have a chance to
1997 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1998 put_ldev(mdev);
1999 drbd_free_ee(mdev, e);
2000 return FALSE;
2001}
2002
2003static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2004{
2005 sector_t sector;
2006 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2007 struct drbd_epoch_entry *e;
2008 struct digest_info *di = NULL;
2009 int size, digest_size;
2010 unsigned int fault_type;
2011 struct p_block_req *p =
2012 (struct p_block_req *)h;
2013 const int brps = sizeof(*p)-sizeof(*h);
2014
2015 if (drbd_recv(mdev, h->payload, brps) != brps)
2016 return FALSE;
2017
2018 sector = be64_to_cpu(p->sector);
2019 size = be32_to_cpu(p->blksize);
2020
2021 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2022 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2023 (unsigned long long)sector, size);
2024 return FALSE;
2025 }
2026 if (sector + (size>>9) > capacity) {
2027 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2028 (unsigned long long)sector, size);
2029 return FALSE;
2030 }
2031
2032 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2033 if (__ratelimit(&drbd_ratelimit_state))
2034 dev_err(DEV, "Can not satisfy peer's read request, "
2035 "no local data.\n");
2036 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2037 P_NEG_RS_DREPLY , p);
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02002038 return drbd_drain_block(mdev, h->length - brps);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002039 }
2040
2041 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2042 * "criss-cross" setup, that might cause write-out on some other DRBD,
2043 * which in turn might block on the other node at this very place. */
2044 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2045 if (!e) {
2046 put_ldev(mdev);
2047 return FALSE;
2048 }
2049
Philipp Reisnerb411b362009-09-25 16:07:19 -07002050 switch (h->command) {
2051 case P_DATA_REQUEST:
2052 e->w.cb = w_e_end_data_req;
2053 fault_type = DRBD_FAULT_DT_RD;
2054 break;
2055 case P_RS_DATA_REQUEST:
2056 e->w.cb = w_e_end_rsdata_req;
2057 fault_type = DRBD_FAULT_RS_RD;
2058 /* Eventually this should become asynchronously. Currently it
2059 * blocks the whole receiver just to delay the reading of a
2060 * resync data block.
2061 * the drbd_work_queue mechanism is made for this...
2062 */
2063 if (!drbd_rs_begin_io(mdev, sector)) {
2064 /* we have been interrupted,
2065 * probably connection lost! */
2066 D_ASSERT(signal_pending(current));
2067 goto out_free_e;
2068 }
2069 break;
2070
2071 case P_OV_REPLY:
2072 case P_CSUM_RS_REQUEST:
2073 fault_type = DRBD_FAULT_RS_RD;
2074 digest_size = h->length - brps ;
2075 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2076 if (!di)
2077 goto out_free_e;
2078
2079 di->digest_size = digest_size;
2080 di->digest = (((char *)di)+sizeof(struct digest_info));
2081
2082 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2083 goto out_free_e;
2084
2085 e->block_id = (u64)(unsigned long)di;
2086 if (h->command == P_CSUM_RS_REQUEST) {
2087 D_ASSERT(mdev->agreed_pro_version >= 89);
2088 e->w.cb = w_e_end_csum_rs_req;
2089 } else if (h->command == P_OV_REPLY) {
2090 e->w.cb = w_e_end_ov_reply;
2091 dec_rs_pending(mdev);
2092 break;
2093 }
2094
2095 if (!drbd_rs_begin_io(mdev, sector)) {
2096 /* we have been interrupted, probably connection lost! */
2097 D_ASSERT(signal_pending(current));
2098 goto out_free_e;
2099 }
2100 break;
2101
2102 case P_OV_REQUEST:
2103 if (mdev->state.conn >= C_CONNECTED &&
2104 mdev->state.conn != C_VERIFY_T)
2105 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2106 drbd_conn_str(mdev->state.conn));
2107 if (mdev->ov_start_sector == ~(sector_t)0 &&
2108 mdev->agreed_pro_version >= 90) {
2109 mdev->ov_start_sector = sector;
2110 mdev->ov_position = sector;
2111 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2112 dev_info(DEV, "Online Verify start sector: %llu\n",
2113 (unsigned long long)sector);
2114 }
2115 e->w.cb = w_e_end_ov_req;
2116 fault_type = DRBD_FAULT_RS_RD;
2117 /* Eventually this should become asynchronous. Currently it
2118 * blocks the whole receiver just to delay the reading of a
2119 * resync data block.
2120 * the drbd_work_queue mechanism is made for this...
2121 */
2122 if (!drbd_rs_begin_io(mdev, sector)) {
2123 /* we have been interrupted,
2124 * probably connection lost! */
2125 D_ASSERT(signal_pending(current));
2126 goto out_free_e;
2127 }
2128 break;
2129
2130
2131 default:
2132 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2133 cmdname(h->command));
2134 fault_type = DRBD_FAULT_MAX;
2135 }
2136
2137 spin_lock_irq(&mdev->req_lock);
2138 list_add(&e->w.list, &mdev->read_ee);
2139 spin_unlock_irq(&mdev->req_lock);
2140
2141 inc_unacked(mdev);
2142
Lars Ellenberg45bb9122010-05-14 17:10:48 +02002143 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2144 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002145
2146out_free_e:
2147 kfree(di);
2148 put_ldev(mdev);
2149 drbd_free_ee(mdev, e);
2150 return FALSE;
2151}
2152
2153static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2154{
2155 int self, peer, rv = -100;
2156 unsigned long ch_self, ch_peer;
2157
2158 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2159 peer = mdev->p_uuid[UI_BITMAP] & 1;
2160
2161 ch_peer = mdev->p_uuid[UI_SIZE];
2162 ch_self = mdev->comm_bm_set;
2163
2164 switch (mdev->net_conf->after_sb_0p) {
2165 case ASB_CONSENSUS:
2166 case ASB_DISCARD_SECONDARY:
2167 case ASB_CALL_HELPER:
2168 dev_err(DEV, "Configuration error.\n");
2169 break;
2170 case ASB_DISCONNECT:
2171 break;
2172 case ASB_DISCARD_YOUNGER_PRI:
2173 if (self == 0 && peer == 1) {
2174 rv = -1;
2175 break;
2176 }
2177 if (self == 1 && peer == 0) {
2178 rv = 1;
2179 break;
2180 }
2181 /* Else fall through to one of the other strategies... */
2182 case ASB_DISCARD_OLDER_PRI:
2183 if (self == 0 && peer == 1) {
2184 rv = 1;
2185 break;
2186 }
2187 if (self == 1 && peer == 0) {
2188 rv = -1;
2189 break;
2190 }
2191 /* Else fall through to one of the other strategies... */
Lars Ellenbergad19bf62009-10-14 09:36:49 +02002192 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
Philipp Reisnerb411b362009-09-25 16:07:19 -07002193 "Using discard-least-changes instead\n");
2194 case ASB_DISCARD_ZERO_CHG:
2195 if (ch_peer == 0 && ch_self == 0) {
2196 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2197 ? -1 : 1;
2198 break;
2199 } else {
2200 if (ch_peer == 0) { rv = 1; break; }
2201 if (ch_self == 0) { rv = -1; break; }
2202 }
2203 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2204 break;
2205 case ASB_DISCARD_LEAST_CHG:
2206 if (ch_self < ch_peer)
2207 rv = -1;
2208 else if (ch_self > ch_peer)
2209 rv = 1;
2210 else /* ( ch_self == ch_peer ) */
2211 /* Well, then use something else. */
2212 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2213 ? -1 : 1;
2214 break;
2215 case ASB_DISCARD_LOCAL:
2216 rv = -1;
2217 break;
2218 case ASB_DISCARD_REMOTE:
2219 rv = 1;
2220 }
2221
2222 return rv;
2223}
2224
2225static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2226{
2227 int self, peer, hg, rv = -100;
2228
2229 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2230 peer = mdev->p_uuid[UI_BITMAP] & 1;
2231
2232 switch (mdev->net_conf->after_sb_1p) {
2233 case ASB_DISCARD_YOUNGER_PRI:
2234 case ASB_DISCARD_OLDER_PRI:
2235 case ASB_DISCARD_LEAST_CHG:
2236 case ASB_DISCARD_LOCAL:
2237 case ASB_DISCARD_REMOTE:
2238 dev_err(DEV, "Configuration error.\n");
2239 break;
2240 case ASB_DISCONNECT:
2241 break;
2242 case ASB_CONSENSUS:
2243 hg = drbd_asb_recover_0p(mdev);
2244 if (hg == -1 && mdev->state.role == R_SECONDARY)
2245 rv = hg;
2246 if (hg == 1 && mdev->state.role == R_PRIMARY)
2247 rv = hg;
2248 break;
2249 case ASB_VIOLENTLY:
2250 rv = drbd_asb_recover_0p(mdev);
2251 break;
2252 case ASB_DISCARD_SECONDARY:
2253 return mdev->state.role == R_PRIMARY ? 1 : -1;
2254 case ASB_CALL_HELPER:
2255 hg = drbd_asb_recover_0p(mdev);
2256 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2257 self = drbd_set_role(mdev, R_SECONDARY, 0);
2258 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2259 * we might be here in C_WF_REPORT_PARAMS which is transient.
2260 * we do not need to wait for the after state change work either. */
2261 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2262 if (self != SS_SUCCESS) {
2263 drbd_khelper(mdev, "pri-lost-after-sb");
2264 } else {
2265 dev_warn(DEV, "Successfully gave up primary role.\n");
2266 rv = hg;
2267 }
2268 } else
2269 rv = hg;
2270 }
2271
2272 return rv;
2273}
2274
2275static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2276{
2277 int self, peer, hg, rv = -100;
2278
2279 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2280 peer = mdev->p_uuid[UI_BITMAP] & 1;
2281
2282 switch (mdev->net_conf->after_sb_2p) {
2283 case ASB_DISCARD_YOUNGER_PRI:
2284 case ASB_DISCARD_OLDER_PRI:
2285 case ASB_DISCARD_LEAST_CHG:
2286 case ASB_DISCARD_LOCAL:
2287 case ASB_DISCARD_REMOTE:
2288 case ASB_CONSENSUS:
2289 case ASB_DISCARD_SECONDARY:
2290 dev_err(DEV, "Configuration error.\n");
2291 break;
2292 case ASB_VIOLENTLY:
2293 rv = drbd_asb_recover_0p(mdev);
2294 break;
2295 case ASB_DISCONNECT:
2296 break;
2297 case ASB_CALL_HELPER:
2298 hg = drbd_asb_recover_0p(mdev);
2299 if (hg == -1) {
2300 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2301 * we might be here in C_WF_REPORT_PARAMS which is transient.
2302 * we do not need to wait for the after state change work either. */
2303 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2304 if (self != SS_SUCCESS) {
2305 drbd_khelper(mdev, "pri-lost-after-sb");
2306 } else {
2307 dev_warn(DEV, "Successfully gave up primary role.\n");
2308 rv = hg;
2309 }
2310 } else
2311 rv = hg;
2312 }
2313
2314 return rv;
2315}
2316
2317static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2318 u64 bits, u64 flags)
2319{
2320 if (!uuid) {
2321 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2322 return;
2323 }
2324 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2325 text,
2326 (unsigned long long)uuid[UI_CURRENT],
2327 (unsigned long long)uuid[UI_BITMAP],
2328 (unsigned long long)uuid[UI_HISTORY_START],
2329 (unsigned long long)uuid[UI_HISTORY_END],
2330 (unsigned long long)bits,
2331 (unsigned long long)flags);
2332}
2333
2334/*
2335 100 after split brain try auto recover
2336 2 C_SYNC_SOURCE set BitMap
2337 1 C_SYNC_SOURCE use BitMap
2338 0 no Sync
2339 -1 C_SYNC_TARGET use BitMap
2340 -2 C_SYNC_TARGET set BitMap
2341 -100 after split brain, disconnect
2342-1000 unrelated data
2343 */
2344static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2345{
2346 u64 self, peer;
2347 int i, j;
2348
2349 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2350 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2351
2352 *rule_nr = 10;
2353 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2354 return 0;
2355
2356 *rule_nr = 20;
2357 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2358 peer != UUID_JUST_CREATED)
2359 return -2;
2360
2361 *rule_nr = 30;
2362 if (self != UUID_JUST_CREATED &&
2363 (peer == UUID_JUST_CREATED || peer == (u64)0))
2364 return 2;
2365
2366 if (self == peer) {
2367 int rct, dc; /* roles at crash time */
2368
2369 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2370
2371 if (mdev->agreed_pro_version < 91)
2372 return -1001;
2373
2374 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2375 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2376 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2377 drbd_uuid_set_bm(mdev, 0UL);
2378
2379 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2380 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2381 *rule_nr = 34;
2382 } else {
2383 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2384 *rule_nr = 36;
2385 }
2386
2387 return 1;
2388 }
2389
2390 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2391
2392 if (mdev->agreed_pro_version < 91)
2393 return -1001;
2394
2395 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2396 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2397 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2398
2399 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2400 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2401 mdev->p_uuid[UI_BITMAP] = 0UL;
2402
2403 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2404 *rule_nr = 35;
2405 } else {
2406 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2407 *rule_nr = 37;
2408 }
2409
2410 return -1;
2411 }
2412
2413 /* Common power [off|failure] */
2414 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2415 (mdev->p_uuid[UI_FLAGS] & 2);
2416 /* lowest bit is set when we were primary,
2417 * next bit (weight 2) is set when peer was primary */
2418 *rule_nr = 40;
2419
2420 switch (rct) {
2421 case 0: /* !self_pri && !peer_pri */ return 0;
2422 case 1: /* self_pri && !peer_pri */ return 1;
2423 case 2: /* !self_pri && peer_pri */ return -1;
2424 case 3: /* self_pri && peer_pri */
2425 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2426 return dc ? -1 : 1;
2427 }
2428 }
2429
2430 *rule_nr = 50;
2431 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2432 if (self == peer)
2433 return -1;
2434
2435 *rule_nr = 51;
2436 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2437 if (self == peer) {
2438 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2439 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2440 if (self == peer) {
2441 /* The last P_SYNC_UUID did not get though. Undo the last start of
2442 resync as sync source modifications of the peer's UUIDs. */
2443
2444 if (mdev->agreed_pro_version < 91)
2445 return -1001;
2446
2447 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2448 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2449 return -1;
2450 }
2451 }
2452
2453 *rule_nr = 60;
2454 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2455 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2456 peer = mdev->p_uuid[i] & ~((u64)1);
2457 if (self == peer)
2458 return -2;
2459 }
2460
2461 *rule_nr = 70;
2462 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2463 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2464 if (self == peer)
2465 return 1;
2466
2467 *rule_nr = 71;
2468 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2469 if (self == peer) {
2470 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2471 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2472 if (self == peer) {
2473 /* The last P_SYNC_UUID did not get though. Undo the last start of
2474 resync as sync source modifications of our UUIDs. */
2475
2476 if (mdev->agreed_pro_version < 91)
2477 return -1001;
2478
2479 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2480 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2481
2482 dev_info(DEV, "Undid last start of resync:\n");
2483
2484 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2485 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2486
2487 return 1;
2488 }
2489 }
2490
2491
2492 *rule_nr = 80;
Philipp Reisnerd8c2a362009-11-18 15:52:51 +01002493 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002494 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2495 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2496 if (self == peer)
2497 return 2;
2498 }
2499
2500 *rule_nr = 90;
2501 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2502 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2503 if (self == peer && self != ((u64)0))
2504 return 100;
2505
2506 *rule_nr = 100;
2507 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2508 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2509 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2510 peer = mdev->p_uuid[j] & ~((u64)1);
2511 if (self == peer)
2512 return -100;
2513 }
2514 }
2515
2516 return -1000;
2517}
2518
2519/* drbd_sync_handshake() returns the new conn state on success, or
2520 CONN_MASK (-1) on failure.
2521 */
2522static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2523 enum drbd_disk_state peer_disk) __must_hold(local)
2524{
2525 int hg, rule_nr;
2526 enum drbd_conns rv = C_MASK;
2527 enum drbd_disk_state mydisk;
2528
2529 mydisk = mdev->state.disk;
2530 if (mydisk == D_NEGOTIATING)
2531 mydisk = mdev->new_state_tmp.disk;
2532
2533 dev_info(DEV, "drbd_sync_handshake:\n");
2534 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2535 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2536 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2537
2538 hg = drbd_uuid_compare(mdev, &rule_nr);
2539
2540 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2541
2542 if (hg == -1000) {
2543 dev_alert(DEV, "Unrelated data, aborting!\n");
2544 return C_MASK;
2545 }
2546 if (hg == -1001) {
2547 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2548 return C_MASK;
2549 }
2550
2551 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2552 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2553 int f = (hg == -100) || abs(hg) == 2;
2554 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2555 if (f)
2556 hg = hg*2;
2557 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2558 hg > 0 ? "source" : "target");
2559 }
2560
Adam Gandelman3a11a482010-04-08 16:48:23 -07002561 if (abs(hg) == 100)
2562 drbd_khelper(mdev, "initial-split-brain");
2563
Philipp Reisnerb411b362009-09-25 16:07:19 -07002564 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2565 int pcount = (mdev->state.role == R_PRIMARY)
2566 + (peer_role == R_PRIMARY);
2567 int forced = (hg == -100);
2568
2569 switch (pcount) {
2570 case 0:
2571 hg = drbd_asb_recover_0p(mdev);
2572 break;
2573 case 1:
2574 hg = drbd_asb_recover_1p(mdev);
2575 break;
2576 case 2:
2577 hg = drbd_asb_recover_2p(mdev);
2578 break;
2579 }
2580 if (abs(hg) < 100) {
2581 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2582 "automatically solved. Sync from %s node\n",
2583 pcount, (hg < 0) ? "peer" : "this");
2584 if (forced) {
2585 dev_warn(DEV, "Doing a full sync, since"
2586 " UUIDs where ambiguous.\n");
2587 hg = hg*2;
2588 }
2589 }
2590 }
2591
2592 if (hg == -100) {
2593 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2594 hg = -1;
2595 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2596 hg = 1;
2597
2598 if (abs(hg) < 100)
2599 dev_warn(DEV, "Split-Brain detected, manually solved. "
2600 "Sync from %s node\n",
2601 (hg < 0) ? "peer" : "this");
2602 }
2603
2604 if (hg == -100) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01002605 /* FIXME this log message is not correct if we end up here
2606 * after an attempted attach on a diskless node.
2607 * We just refuse to attach -- well, we drop the "connection"
2608 * to that disk, in a way... */
Adam Gandelman3a11a482010-04-08 16:48:23 -07002609 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
Philipp Reisnerb411b362009-09-25 16:07:19 -07002610 drbd_khelper(mdev, "split-brain");
2611 return C_MASK;
2612 }
2613
2614 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2615 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2616 return C_MASK;
2617 }
2618
2619 if (hg < 0 && /* by intention we do not use mydisk here. */
2620 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2621 switch (mdev->net_conf->rr_conflict) {
2622 case ASB_CALL_HELPER:
2623 drbd_khelper(mdev, "pri-lost");
2624 /* fall through */
2625 case ASB_DISCONNECT:
2626 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2627 return C_MASK;
2628 case ASB_VIOLENTLY:
2629 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2630 "assumption\n");
2631 }
2632 }
2633
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002634 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2635 if (hg == 0)
2636 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2637 else
2638 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2639 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2640 abs(hg) >= 2 ? "full" : "bit-map based");
2641 return C_MASK;
2642 }
2643
Philipp Reisnerb411b362009-09-25 16:07:19 -07002644 if (abs(hg) >= 2) {
2645 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2646 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2647 return C_MASK;
2648 }
2649
2650 if (hg > 0) { /* become sync source. */
2651 rv = C_WF_BITMAP_S;
2652 } else if (hg < 0) { /* become sync target */
2653 rv = C_WF_BITMAP_T;
2654 } else {
2655 rv = C_CONNECTED;
2656 if (drbd_bm_total_weight(mdev)) {
2657 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2658 drbd_bm_total_weight(mdev));
2659 }
2660 }
2661
2662 return rv;
2663}
2664
2665/* returns 1 if invalid */
2666static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2667{
2668 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2669 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2670 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2671 return 0;
2672
2673 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2674 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2675 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2676 return 1;
2677
2678 /* everything else is valid if they are equal on both sides. */
2679 if (peer == self)
2680 return 0;
2681
2682 /* everything es is invalid. */
2683 return 1;
2684}
2685
2686static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2687{
2688 struct p_protocol *p = (struct p_protocol *)h;
2689 int header_size, data_size;
2690 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002691 int p_want_lose, p_two_primaries, cf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002692 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2693
2694 header_size = sizeof(*p) - sizeof(*h);
2695 data_size = h->length - header_size;
2696
2697 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2698 return FALSE;
2699
2700 p_proto = be32_to_cpu(p->protocol);
2701 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2702 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2703 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002704 p_two_primaries = be32_to_cpu(p->two_primaries);
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002705 cf = be32_to_cpu(p->conn_flags);
2706 p_want_lose = cf & CF_WANT_LOSE;
2707
2708 clear_bit(CONN_DRY_RUN, &mdev->flags);
2709
2710 if (cf & CF_DRY_RUN)
2711 set_bit(CONN_DRY_RUN, &mdev->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002712
2713 if (p_proto != mdev->net_conf->wire_protocol) {
2714 dev_err(DEV, "incompatible communication protocols\n");
2715 goto disconnect;
2716 }
2717
2718 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2719 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2720 goto disconnect;
2721 }
2722
2723 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2724 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2725 goto disconnect;
2726 }
2727
2728 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2729 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2730 goto disconnect;
2731 }
2732
2733 if (p_want_lose && mdev->net_conf->want_lose) {
2734 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2735 goto disconnect;
2736 }
2737
2738 if (p_two_primaries != mdev->net_conf->two_primaries) {
2739 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2740 goto disconnect;
2741 }
2742
2743 if (mdev->agreed_pro_version >= 87) {
2744 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2745
2746 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2747 return FALSE;
2748
2749 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2750 if (strcmp(p_integrity_alg, my_alg)) {
2751 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2752 goto disconnect;
2753 }
2754 dev_info(DEV, "data-integrity-alg: %s\n",
2755 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2756 }
2757
2758 return TRUE;
2759
2760disconnect:
2761 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2762 return FALSE;
2763}
2764
2765/* helper function
2766 * input: alg name, feature name
2767 * return: NULL (alg name was "")
2768 * ERR_PTR(error) if something goes wrong
2769 * or the crypto hash ptr, if it worked out ok. */
2770struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2771 const char *alg, const char *name)
2772{
2773 struct crypto_hash *tfm;
2774
2775 if (!alg[0])
2776 return NULL;
2777
2778 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2779 if (IS_ERR(tfm)) {
2780 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2781 alg, name, PTR_ERR(tfm));
2782 return tfm;
2783 }
2784 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2785 crypto_free_hash(tfm);
2786 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2787 return ERR_PTR(-EINVAL);
2788 }
2789 return tfm;
2790}
2791
2792static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2793{
2794 int ok = TRUE;
2795 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2796 unsigned int header_size, data_size, exp_max_sz;
2797 struct crypto_hash *verify_tfm = NULL;
2798 struct crypto_hash *csums_tfm = NULL;
2799 const int apv = mdev->agreed_pro_version;
2800
2801 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2802 : apv == 88 ? sizeof(struct p_rs_param)
2803 + SHARED_SECRET_MAX
2804 : /* 89 */ sizeof(struct p_rs_param_89);
2805
2806 if (h->length > exp_max_sz) {
2807 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2808 h->length, exp_max_sz);
2809 return FALSE;
2810 }
2811
2812 if (apv <= 88) {
2813 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2814 data_size = h->length - header_size;
2815 } else /* apv >= 89 */ {
2816 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2817 data_size = h->length - header_size;
2818 D_ASSERT(data_size == 0);
2819 }
2820
2821 /* initialize verify_alg and csums_alg */
2822 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2823
2824 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2825 return FALSE;
2826
2827 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2828
2829 if (apv >= 88) {
2830 if (apv == 88) {
2831 if (data_size > SHARED_SECRET_MAX) {
2832 dev_err(DEV, "verify-alg too long, "
2833 "peer wants %u, accepting only %u byte\n",
2834 data_size, SHARED_SECRET_MAX);
2835 return FALSE;
2836 }
2837
2838 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2839 return FALSE;
2840
2841 /* we expect NUL terminated string */
2842 /* but just in case someone tries to be evil */
2843 D_ASSERT(p->verify_alg[data_size-1] == 0);
2844 p->verify_alg[data_size-1] = 0;
2845
2846 } else /* apv >= 89 */ {
2847 /* we still expect NUL terminated strings */
2848 /* but just in case someone tries to be evil */
2849 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2850 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2851 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2852 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2853 }
2854
2855 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2856 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2857 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2858 mdev->sync_conf.verify_alg, p->verify_alg);
2859 goto disconnect;
2860 }
2861 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2862 p->verify_alg, "verify-alg");
2863 if (IS_ERR(verify_tfm)) {
2864 verify_tfm = NULL;
2865 goto disconnect;
2866 }
2867 }
2868
2869 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2870 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2871 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2872 mdev->sync_conf.csums_alg, p->csums_alg);
2873 goto disconnect;
2874 }
2875 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2876 p->csums_alg, "csums-alg");
2877 if (IS_ERR(csums_tfm)) {
2878 csums_tfm = NULL;
2879 goto disconnect;
2880 }
2881 }
2882
2883
2884 spin_lock(&mdev->peer_seq_lock);
2885 /* lock against drbd_nl_syncer_conf() */
2886 if (verify_tfm) {
2887 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2888 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2889 crypto_free_hash(mdev->verify_tfm);
2890 mdev->verify_tfm = verify_tfm;
2891 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2892 }
2893 if (csums_tfm) {
2894 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2895 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2896 crypto_free_hash(mdev->csums_tfm);
2897 mdev->csums_tfm = csums_tfm;
2898 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2899 }
2900 spin_unlock(&mdev->peer_seq_lock);
2901 }
2902
2903 return ok;
2904disconnect:
2905 /* just for completeness: actually not needed,
2906 * as this is not reached if csums_tfm was ok. */
2907 crypto_free_hash(csums_tfm);
2908 /* but free the verify_tfm again, if csums_tfm did not work out */
2909 crypto_free_hash(verify_tfm);
2910 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2911 return FALSE;
2912}
2913
2914static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2915{
2916 /* sorry, we currently have no working implementation
2917 * of distributed TCQ */
2918}
2919
2920/* warn if the arguments differ by more than 12.5% */
2921static void warn_if_differ_considerably(struct drbd_conf *mdev,
2922 const char *s, sector_t a, sector_t b)
2923{
2924 sector_t d;
2925 if (a == 0 || b == 0)
2926 return;
2927 d = (a > b) ? (a - b) : (b - a);
2928 if (d > (a>>3) || d > (b>>3))
2929 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2930 (unsigned long long)a, (unsigned long long)b);
2931}
2932
2933static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2934{
2935 struct p_sizes *p = (struct p_sizes *)h;
2936 enum determine_dev_size dd = unchanged;
2937 unsigned int max_seg_s;
2938 sector_t p_size, p_usize, my_usize;
2939 int ldsc = 0; /* local disk size changed */
Philipp Reisnere89b5912010-03-24 17:11:33 +01002940 enum dds_flags ddsf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002941
2942 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2943 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2944 return FALSE;
2945
2946 p_size = be64_to_cpu(p->d_size);
2947 p_usize = be64_to_cpu(p->u_size);
2948
2949 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2950 dev_err(DEV, "some backing storage is needed\n");
2951 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2952 return FALSE;
2953 }
2954
2955 /* just store the peer's disk size for now.
2956 * we still need to figure out whether we accept that. */
2957 mdev->p_size = p_size;
2958
2959#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2960 if (get_ldev(mdev)) {
2961 warn_if_differ_considerably(mdev, "lower level device sizes",
2962 p_size, drbd_get_max_capacity(mdev->ldev));
2963 warn_if_differ_considerably(mdev, "user requested size",
2964 p_usize, mdev->ldev->dc.disk_size);
2965
2966 /* if this is the first connect, or an otherwise expected
2967 * param exchange, choose the minimum */
2968 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2969 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2970 p_usize);
2971
2972 my_usize = mdev->ldev->dc.disk_size;
2973
2974 if (mdev->ldev->dc.disk_size != p_usize) {
2975 mdev->ldev->dc.disk_size = p_usize;
2976 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2977 (unsigned long)mdev->ldev->dc.disk_size);
2978 }
2979
2980 /* Never shrink a device with usable data during connect.
2981 But allow online shrinking if we are connected. */
Philipp Reisnera393db62009-12-22 13:35:52 +01002982 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
Philipp Reisnerb411b362009-09-25 16:07:19 -07002983 drbd_get_capacity(mdev->this_bdev) &&
2984 mdev->state.disk >= D_OUTDATED &&
2985 mdev->state.conn < C_CONNECTED) {
2986 dev_err(DEV, "The peer's disk size is too small!\n");
2987 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2988 mdev->ldev->dc.disk_size = my_usize;
2989 put_ldev(mdev);
2990 return FALSE;
2991 }
2992 put_ldev(mdev);
2993 }
2994#undef min_not_zero
2995
Philipp Reisnere89b5912010-03-24 17:11:33 +01002996 ddsf = be16_to_cpu(p->dds_flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002997 if (get_ldev(mdev)) {
Philipp Reisnere89b5912010-03-24 17:11:33 +01002998 dd = drbd_determin_dev_size(mdev, ddsf);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002999 put_ldev(mdev);
3000 if (dd == dev_size_error)
3001 return FALSE;
3002 drbd_md_sync(mdev);
3003 } else {
3004 /* I am diskless, need to accept the peer's size. */
3005 drbd_set_my_capacity(mdev, p_size);
3006 }
3007
Philipp Reisnerb411b362009-09-25 16:07:19 -07003008 if (get_ldev(mdev)) {
3009 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3010 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3011 ldsc = 1;
3012 }
3013
3014 max_seg_s = be32_to_cpu(p->max_segment_size);
3015 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3016 drbd_setup_queue_param(mdev, max_seg_s);
3017
Philipp Reisnere89b5912010-03-24 17:11:33 +01003018 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003019 put_ldev(mdev);
3020 }
3021
3022 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3023 if (be64_to_cpu(p->c_size) !=
3024 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3025 /* we have different sizes, probably peer
3026 * needs to know my new size... */
Philipp Reisnere89b5912010-03-24 17:11:33 +01003027 drbd_send_sizes(mdev, 0, ddsf);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003028 }
3029 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3030 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3031 if (mdev->state.pdsk >= D_INCONSISTENT &&
Philipp Reisnere89b5912010-03-24 17:11:33 +01003032 mdev->state.disk >= D_INCONSISTENT) {
3033 if (ddsf & DDSF_NO_RESYNC)
3034 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3035 else
3036 resync_after_online_grow(mdev);
3037 } else
Philipp Reisnerb411b362009-09-25 16:07:19 -07003038 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3039 }
3040 }
3041
3042 return TRUE;
3043}
3044
3045static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3046{
3047 struct p_uuids *p = (struct p_uuids *)h;
3048 u64 *p_uuid;
3049 int i;
3050
3051 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3052 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3053 return FALSE;
3054
3055 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3056
3057 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3058 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3059
3060 kfree(mdev->p_uuid);
3061 mdev->p_uuid = p_uuid;
3062
3063 if (mdev->state.conn < C_CONNECTED &&
3064 mdev->state.disk < D_INCONSISTENT &&
3065 mdev->state.role == R_PRIMARY &&
3066 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3067 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3068 (unsigned long long)mdev->ed_uuid);
3069 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3070 return FALSE;
3071 }
3072
3073 if (get_ldev(mdev)) {
3074 int skip_initial_sync =
3075 mdev->state.conn == C_CONNECTED &&
3076 mdev->agreed_pro_version >= 90 &&
3077 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3078 (p_uuid[UI_FLAGS] & 8);
3079 if (skip_initial_sync) {
3080 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3081 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3082 "clear_n_write from receive_uuids");
3083 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3084 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3085 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3086 CS_VERBOSE, NULL);
3087 drbd_md_sync(mdev);
3088 }
3089 put_ldev(mdev);
3090 }
3091
3092 /* Before we test for the disk state, we should wait until an eventually
3093 ongoing cluster wide state change is finished. That is important if
3094 we are primary and are detaching from our disk. We need to see the
3095 new disk state... */
3096 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3097 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3098 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3099
3100 return TRUE;
3101}
3102
3103/**
3104 * convert_state() - Converts the peer's view of the cluster state to our point of view
3105 * @ps: The state as seen by the peer.
3106 */
3107static union drbd_state convert_state(union drbd_state ps)
3108{
3109 union drbd_state ms;
3110
3111 static enum drbd_conns c_tab[] = {
3112 [C_CONNECTED] = C_CONNECTED,
3113
3114 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3115 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3116 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3117 [C_VERIFY_S] = C_VERIFY_T,
3118 [C_MASK] = C_MASK,
3119 };
3120
3121 ms.i = ps.i;
3122
3123 ms.conn = c_tab[ps.conn];
3124 ms.peer = ps.role;
3125 ms.role = ps.peer;
3126 ms.pdsk = ps.disk;
3127 ms.disk = ps.pdsk;
3128 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3129
3130 return ms;
3131}
3132
3133static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3134{
3135 struct p_req_state *p = (struct p_req_state *)h;
3136 union drbd_state mask, val;
3137 int rv;
3138
3139 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3140 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3141 return FALSE;
3142
3143 mask.i = be32_to_cpu(p->mask);
3144 val.i = be32_to_cpu(p->val);
3145
3146 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3147 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3148 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3149 return TRUE;
3150 }
3151
3152 mask = convert_state(mask);
3153 val = convert_state(val);
3154
3155 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3156
3157 drbd_send_sr_reply(mdev, rv);
3158 drbd_md_sync(mdev);
3159
3160 return TRUE;
3161}
3162
3163static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3164{
3165 struct p_state *p = (struct p_state *)h;
3166 enum drbd_conns nconn, oconn;
3167 union drbd_state ns, peer_state;
3168 enum drbd_disk_state real_peer_disk;
3169 int rv;
3170
3171 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3172 return FALSE;
3173
3174 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3175 return FALSE;
3176
3177 peer_state.i = be32_to_cpu(p->state);
3178
3179 real_peer_disk = peer_state.disk;
3180 if (peer_state.disk == D_NEGOTIATING) {
3181 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3182 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3183 }
3184
3185 spin_lock_irq(&mdev->req_lock);
3186 retry:
3187 oconn = nconn = mdev->state.conn;
3188 spin_unlock_irq(&mdev->req_lock);
3189
3190 if (nconn == C_WF_REPORT_PARAMS)
3191 nconn = C_CONNECTED;
3192
3193 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3194 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3195 int cr; /* consider resync */
3196
3197 /* if we established a new connection */
3198 cr = (oconn < C_CONNECTED);
3199 /* if we had an established connection
3200 * and one of the nodes newly attaches a disk */
3201 cr |= (oconn == C_CONNECTED &&
3202 (peer_state.disk == D_NEGOTIATING ||
3203 mdev->state.disk == D_NEGOTIATING));
3204 /* if we have both been inconsistent, and the peer has been
3205 * forced to be UpToDate with --overwrite-data */
3206 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3207 /* if we had been plain connected, and the admin requested to
3208 * start a sync by "invalidate" or "invalidate-remote" */
3209 cr |= (oconn == C_CONNECTED &&
3210 (peer_state.conn >= C_STARTING_SYNC_S &&
3211 peer_state.conn <= C_WF_BITMAP_T));
3212
3213 if (cr)
3214 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3215
3216 put_ldev(mdev);
3217 if (nconn == C_MASK) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01003218 nconn = C_CONNECTED;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003219 if (mdev->state.disk == D_NEGOTIATING) {
3220 drbd_force_state(mdev, NS(disk, D_DISKLESS));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003221 } else if (peer_state.disk == D_NEGOTIATING) {
3222 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3223 peer_state.disk = D_DISKLESS;
Lars Ellenberg580b9762010-02-26 23:15:23 +01003224 real_peer_disk = D_DISKLESS;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003225 } else {
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01003226 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3227 return FALSE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003228 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3229 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3230 return FALSE;
3231 }
3232 }
3233 }
3234
3235 spin_lock_irq(&mdev->req_lock);
3236 if (mdev->state.conn != oconn)
3237 goto retry;
3238 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3239 ns.i = mdev->state.i;
3240 ns.conn = nconn;
3241 ns.peer = peer_state.role;
3242 ns.pdsk = real_peer_disk;
3243 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3244 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3245 ns.disk = mdev->new_state_tmp.disk;
3246
3247 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3248 ns = mdev->state;
3249 spin_unlock_irq(&mdev->req_lock);
3250
3251 if (rv < SS_SUCCESS) {
3252 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3253 return FALSE;
3254 }
3255
3256 if (oconn > C_WF_REPORT_PARAMS) {
3257 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3258 peer_state.disk != D_NEGOTIATING ) {
3259 /* we want resync, peer has not yet decided to sync... */
3260 /* Nowadays only used when forcing a node into primary role and
3261 setting its disk to UpToDate with that */
3262 drbd_send_uuids(mdev);
3263 drbd_send_state(mdev);
3264 }
3265 }
3266
3267 mdev->net_conf->want_lose = 0;
3268
3269 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3270
3271 return TRUE;
3272}
3273
3274static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3275{
3276 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3277
3278 wait_event(mdev->misc_wait,
3279 mdev->state.conn == C_WF_SYNC_UUID ||
3280 mdev->state.conn < C_CONNECTED ||
3281 mdev->state.disk < D_NEGOTIATING);
3282
3283 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3284
3285 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3286 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3287 return FALSE;
3288
3289 /* Here the _drbd_uuid_ functions are right, current should
3290 _not_ be rotated into the history */
3291 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3292 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3293 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3294
3295 drbd_start_resync(mdev, C_SYNC_TARGET);
3296
3297 put_ldev(mdev);
3298 } else
3299 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3300
3301 return TRUE;
3302}
3303
3304enum receive_bitmap_ret { OK, DONE, FAILED };
3305
3306static enum receive_bitmap_ret
3307receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3308 unsigned long *buffer, struct bm_xfer_ctx *c)
3309{
3310 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3311 unsigned want = num_words * sizeof(long);
3312
3313 if (want != h->length) {
3314 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3315 return FAILED;
3316 }
3317 if (want == 0)
3318 return DONE;
3319 if (drbd_recv(mdev, buffer, want) != want)
3320 return FAILED;
3321
3322 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3323
3324 c->word_offset += num_words;
3325 c->bit_offset = c->word_offset * BITS_PER_LONG;
3326 if (c->bit_offset > c->bm_bits)
3327 c->bit_offset = c->bm_bits;
3328
3329 return OK;
3330}
3331
3332static enum receive_bitmap_ret
3333recv_bm_rle_bits(struct drbd_conf *mdev,
3334 struct p_compressed_bm *p,
3335 struct bm_xfer_ctx *c)
3336{
3337 struct bitstream bs;
3338 u64 look_ahead;
3339 u64 rl;
3340 u64 tmp;
3341 unsigned long s = c->bit_offset;
3342 unsigned long e;
3343 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3344 int toggle = DCBP_get_start(p);
3345 int have;
3346 int bits;
3347
3348 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3349
3350 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3351 if (bits < 0)
3352 return FAILED;
3353
3354 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3355 bits = vli_decode_bits(&rl, look_ahead);
3356 if (bits <= 0)
3357 return FAILED;
3358
3359 if (toggle) {
3360 e = s + rl -1;
3361 if (e >= c->bm_bits) {
3362 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3363 return FAILED;
3364 }
3365 _drbd_bm_set_bits(mdev, s, e);
3366 }
3367
3368 if (have < bits) {
3369 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3370 have, bits, look_ahead,
3371 (unsigned int)(bs.cur.b - p->code),
3372 (unsigned int)bs.buf_len);
3373 return FAILED;
3374 }
3375 look_ahead >>= bits;
3376 have -= bits;
3377
3378 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3379 if (bits < 0)
3380 return FAILED;
3381 look_ahead |= tmp << have;
3382 have += bits;
3383 }
3384
3385 c->bit_offset = s;
3386 bm_xfer_ctx_bit_to_word_offset(c);
3387
3388 return (s == c->bm_bits) ? DONE : OK;
3389}
3390
3391static enum receive_bitmap_ret
3392decode_bitmap_c(struct drbd_conf *mdev,
3393 struct p_compressed_bm *p,
3394 struct bm_xfer_ctx *c)
3395{
3396 if (DCBP_get_code(p) == RLE_VLI_Bits)
3397 return recv_bm_rle_bits(mdev, p, c);
3398
3399 /* other variants had been implemented for evaluation,
3400 * but have been dropped as this one turned out to be "best"
3401 * during all our tests. */
3402
3403 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3404 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3405 return FAILED;
3406}
3407
3408void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3409 const char *direction, struct bm_xfer_ctx *c)
3410{
3411 /* what would it take to transfer it "plaintext" */
3412 unsigned plain = sizeof(struct p_header) *
3413 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3414 + c->bm_words * sizeof(long);
3415 unsigned total = c->bytes[0] + c->bytes[1];
3416 unsigned r;
3417
3418 /* total can not be zero. but just in case: */
3419 if (total == 0)
3420 return;
3421
3422 /* don't report if not compressed */
3423 if (total >= plain)
3424 return;
3425
3426 /* total < plain. check for overflow, still */
3427 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3428 : (1000 * total / plain);
3429
3430 if (r > 1000)
3431 r = 1000;
3432
3433 r = 1000 - r;
3434 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3435 "total %u; compression: %u.%u%%\n",
3436 direction,
3437 c->bytes[1], c->packets[1],
3438 c->bytes[0], c->packets[0],
3439 total, r/10, r % 10);
3440}
3441
3442/* Since we are processing the bitfield from lower addresses to higher,
3443 it does not matter if the process it in 32 bit chunks or 64 bit
3444 chunks as long as it is little endian. (Understand it as byte stream,
3445 beginning with the lowest byte...) If we would use big endian
3446 we would need to process it from the highest address to the lowest,
3447 in order to be agnostic to the 32 vs 64 bits issue.
3448
3449 returns 0 on failure, 1 if we successfully received it. */
3450static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3451{
3452 struct bm_xfer_ctx c;
3453 void *buffer;
3454 enum receive_bitmap_ret ret;
3455 int ok = FALSE;
3456
3457 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3458
3459 drbd_bm_lock(mdev, "receive bitmap");
3460
3461 /* maybe we should use some per thread scratch page,
3462 * and allocate that during initial device creation? */
3463 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3464 if (!buffer) {
3465 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3466 goto out;
3467 }
3468
3469 c = (struct bm_xfer_ctx) {
3470 .bm_bits = drbd_bm_bits(mdev),
3471 .bm_words = drbd_bm_words(mdev),
3472 };
3473
3474 do {
3475 if (h->command == P_BITMAP) {
3476 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3477 } else if (h->command == P_COMPRESSED_BITMAP) {
3478 /* MAYBE: sanity check that we speak proto >= 90,
3479 * and the feature is enabled! */
3480 struct p_compressed_bm *p;
3481
3482 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3483 dev_err(DEV, "ReportCBitmap packet too large\n");
3484 goto out;
3485 }
3486 /* use the page buff */
3487 p = buffer;
3488 memcpy(p, h, sizeof(*h));
3489 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3490 goto out;
3491 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3492 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3493 return FAILED;
3494 }
3495 ret = decode_bitmap_c(mdev, p, &c);
3496 } else {
3497 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3498 goto out;
3499 }
3500
3501 c.packets[h->command == P_BITMAP]++;
3502 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3503
3504 if (ret != OK)
3505 break;
3506
3507 if (!drbd_recv_header(mdev, h))
3508 goto out;
3509 } while (ret == OK);
3510 if (ret == FAILED)
3511 goto out;
3512
3513 INFO_bm_xfer_stats(mdev, "receive", &c);
3514
3515 if (mdev->state.conn == C_WF_BITMAP_T) {
3516 ok = !drbd_send_bitmap(mdev);
3517 if (!ok)
3518 goto out;
3519 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3520 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3521 D_ASSERT(ok == SS_SUCCESS);
3522 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3523 /* admin may have requested C_DISCONNECTING,
3524 * other threads may have noticed network errors */
3525 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3526 drbd_conn_str(mdev->state.conn));
3527 }
3528
3529 ok = TRUE;
3530 out:
3531 drbd_bm_unlock(mdev);
3532 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3533 drbd_start_resync(mdev, C_SYNC_SOURCE);
3534 free_page((unsigned long) buffer);
3535 return ok;
3536}
3537
3538static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3539{
3540 /* TODO zero copy sink :) */
3541 static char sink[128];
3542 int size, want, r;
3543
3544 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3545 h->command, h->length);
3546
3547 size = h->length;
3548 while (size > 0) {
3549 want = min_t(int, size, sizeof(sink));
3550 r = drbd_recv(mdev, sink, want);
3551 ERR_IF(r <= 0) break;
3552 size -= r;
3553 }
3554 return size == 0;
3555}
3556
3557static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3558{
3559 if (mdev->state.disk >= D_INCONSISTENT)
3560 drbd_kick_lo(mdev);
3561
3562 /* Make sure we've acked all the TCP data associated
3563 * with the data requests being unplugged */
3564 drbd_tcp_quickack(mdev->data.socket);
3565
3566 return TRUE;
3567}
3568
Philipp Reisner0ced55a2010-04-30 15:26:20 +02003569static void timeval_sub_us(struct timeval* tv, unsigned int us)
3570{
3571 tv->tv_sec -= us / 1000000;
3572 us = us % 1000000;
3573 if (tv->tv_usec > us) {
3574 tv->tv_usec += 1000000;
3575 tv->tv_sec--;
3576 }
3577 tv->tv_usec -= us;
3578}
3579
3580static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3581{
3582 struct delay_probe *dp;
3583 struct list_head *le;
3584 struct timeval now;
3585 int seq_num;
3586 int offset;
3587 int data_delay;
3588
3589 seq_num = be32_to_cpu(p->seq_num);
3590 offset = be32_to_cpu(p->offset);
3591
3592 spin_lock(&mdev->peer_seq_lock);
3593 if (!list_empty(&mdev->delay_probes)) {
3594 if (from == USE_DATA_SOCKET)
3595 le = mdev->delay_probes.next;
3596 else
3597 le = mdev->delay_probes.prev;
3598
3599 dp = list_entry(le, struct delay_probe, list);
3600
3601 if (dp->seq_num == seq_num) {
3602 list_del(le);
3603 spin_unlock(&mdev->peer_seq_lock);
3604 do_gettimeofday(&now);
3605 timeval_sub_us(&now, offset);
3606 data_delay =
3607 now.tv_usec - dp->time.tv_usec +
3608 (now.tv_sec - dp->time.tv_sec) * 1000000;
3609
3610 if (data_delay > 0)
3611 mdev->data_delay = data_delay;
3612
3613 kfree(dp);
3614 return;
3615 }
3616
3617 if (dp->seq_num > seq_num) {
3618 spin_unlock(&mdev->peer_seq_lock);
3619 dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3620 return; /* Do not alloca a struct delay_probe.... */
3621 }
3622 }
3623 spin_unlock(&mdev->peer_seq_lock);
3624
3625 dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3626 if (!dp) {
3627 dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3628 return;
3629 }
3630
3631 dp->seq_num = seq_num;
3632 do_gettimeofday(&dp->time);
3633 timeval_sub_us(&dp->time, offset);
3634
3635 spin_lock(&mdev->peer_seq_lock);
3636 if (from == USE_DATA_SOCKET)
3637 list_add(&dp->list, &mdev->delay_probes);
3638 else
3639 list_add_tail(&dp->list, &mdev->delay_probes);
3640 spin_unlock(&mdev->peer_seq_lock);
3641}
3642
3643static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3644{
3645 struct p_delay_probe *p = (struct p_delay_probe *)h;
3646
3647 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3648 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3649 return FALSE;
3650
3651 got_delay_probe(mdev, USE_DATA_SOCKET, p);
3652 return TRUE;
3653}
3654
Philipp Reisnerb411b362009-09-25 16:07:19 -07003655typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3656
3657static drbd_cmd_handler_f drbd_default_handler[] = {
3658 [P_DATA] = receive_Data,
3659 [P_DATA_REPLY] = receive_DataReply,
3660 [P_RS_DATA_REPLY] = receive_RSDataReply,
3661 [P_BARRIER] = receive_Barrier,
3662 [P_BITMAP] = receive_bitmap,
3663 [P_COMPRESSED_BITMAP] = receive_bitmap,
3664 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3665 [P_DATA_REQUEST] = receive_DataRequest,
3666 [P_RS_DATA_REQUEST] = receive_DataRequest,
3667 [P_SYNC_PARAM] = receive_SyncParam,
3668 [P_SYNC_PARAM89] = receive_SyncParam,
3669 [P_PROTOCOL] = receive_protocol,
3670 [P_UUIDS] = receive_uuids,
3671 [P_SIZES] = receive_sizes,
3672 [P_STATE] = receive_state,
3673 [P_STATE_CHG_REQ] = receive_req_state,
3674 [P_SYNC_UUID] = receive_sync_uuid,
3675 [P_OV_REQUEST] = receive_DataRequest,
3676 [P_OV_REPLY] = receive_DataRequest,
3677 [P_CSUM_RS_REQUEST] = receive_DataRequest,
Philipp Reisner0ced55a2010-04-30 15:26:20 +02003678 [P_DELAY_PROBE] = receive_delay_probe,
Philipp Reisnerb411b362009-09-25 16:07:19 -07003679 /* anything missing from this table is in
3680 * the asender_tbl, see get_asender_cmd */
3681 [P_MAX_CMD] = NULL,
3682};
3683
3684static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3685static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3686
3687static void drbdd(struct drbd_conf *mdev)
3688{
3689 drbd_cmd_handler_f handler;
3690 struct p_header *header = &mdev->data.rbuf.header;
3691
3692 while (get_t_state(&mdev->receiver) == Running) {
3693 drbd_thread_current_set_cpu(mdev);
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003694 if (!drbd_recv_header(mdev, header)) {
3695 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003696 break;
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003697 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003698
3699 if (header->command < P_MAX_CMD)
3700 handler = drbd_cmd_handler[header->command];
3701 else if (P_MAY_IGNORE < header->command
3702 && header->command < P_MAX_OPT_CMD)
3703 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3704 else if (header->command > P_MAX_OPT_CMD)
3705 handler = receive_skip;
3706 else
3707 handler = NULL;
3708
3709 if (unlikely(!handler)) {
3710 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3711 header->command, header->length);
3712 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3713 break;
3714 }
3715 if (unlikely(!handler(mdev, header))) {
3716 dev_err(DEV, "error receiving %s, l: %d!\n",
3717 cmdname(header->command), header->length);
3718 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3719 break;
3720 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003721 }
3722}
3723
3724static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3725{
3726 struct hlist_head *slot;
3727 struct hlist_node *pos;
3728 struct hlist_node *tmp;
3729 struct drbd_request *req;
3730 int i;
3731
3732 /*
3733 * Application READ requests
3734 */
3735 spin_lock_irq(&mdev->req_lock);
3736 for (i = 0; i < APP_R_HSIZE; i++) {
3737 slot = mdev->app_reads_hash+i;
3738 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3739 /* it may (but should not any longer!)
3740 * be on the work queue; if that assert triggers,
3741 * we need to also grab the
3742 * spin_lock_irq(&mdev->data.work.q_lock);
3743 * and list_del_init here. */
3744 D_ASSERT(list_empty(&req->w.list));
3745 /* It would be nice to complete outside of spinlock.
3746 * But this is easier for now. */
3747 _req_mod(req, connection_lost_while_pending);
3748 }
3749 }
3750 for (i = 0; i < APP_R_HSIZE; i++)
3751 if (!hlist_empty(mdev->app_reads_hash+i))
3752 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3753 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3754
3755 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3756 spin_unlock_irq(&mdev->req_lock);
3757}
3758
3759void drbd_flush_workqueue(struct drbd_conf *mdev)
3760{
3761 struct drbd_wq_barrier barr;
3762
3763 barr.w.cb = w_prev_work_done;
3764 init_completion(&barr.done);
3765 drbd_queue_work(&mdev->data.work, &barr.w);
3766 wait_for_completion(&barr.done);
3767}
3768
3769static void drbd_disconnect(struct drbd_conf *mdev)
3770{
3771 enum drbd_fencing_p fp;
3772 union drbd_state os, ns;
3773 int rv = SS_UNKNOWN_ERROR;
3774 unsigned int i;
3775
3776 if (mdev->state.conn == C_STANDALONE)
3777 return;
3778 if (mdev->state.conn >= C_WF_CONNECTION)
3779 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3780 drbd_conn_str(mdev->state.conn));
3781
3782 /* asender does not clean up anything. it must not interfere, either */
3783 drbd_thread_stop(&mdev->asender);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003784 drbd_free_sock(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003785
3786 spin_lock_irq(&mdev->req_lock);
3787 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3788 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3789 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3790 spin_unlock_irq(&mdev->req_lock);
3791
3792 /* We do not have data structures that would allow us to
3793 * get the rs_pending_cnt down to 0 again.
3794 * * On C_SYNC_TARGET we do not have any data structures describing
3795 * the pending RSDataRequest's we have sent.
3796 * * On C_SYNC_SOURCE there is no data structure that tracks
3797 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3798 * And no, it is not the sum of the reference counts in the
3799 * resync_LRU. The resync_LRU tracks the whole operation including
3800 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3801 * on the fly. */
3802 drbd_rs_cancel_all(mdev);
3803 mdev->rs_total = 0;
3804 mdev->rs_failed = 0;
3805 atomic_set(&mdev->rs_pending_cnt, 0);
3806 wake_up(&mdev->misc_wait);
3807
3808 /* make sure syncer is stopped and w_resume_next_sg queued */
3809 del_timer_sync(&mdev->resync_timer);
3810 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3811 resync_timer_fn((unsigned long)mdev);
3812
Philipp Reisnerb411b362009-09-25 16:07:19 -07003813 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3814 * w_make_resync_request etc. which may still be on the worker queue
3815 * to be "canceled" */
3816 drbd_flush_workqueue(mdev);
3817
3818 /* This also does reclaim_net_ee(). If we do this too early, we might
3819 * miss some resync ee and pages.*/
3820 drbd_process_done_ee(mdev);
3821
3822 kfree(mdev->p_uuid);
3823 mdev->p_uuid = NULL;
3824
3825 if (!mdev->state.susp)
3826 tl_clear(mdev);
3827
3828 drbd_fail_pending_reads(mdev);
3829
3830 dev_info(DEV, "Connection closed\n");
3831
3832 drbd_md_sync(mdev);
3833
3834 fp = FP_DONT_CARE;
3835 if (get_ldev(mdev)) {
3836 fp = mdev->ldev->dc.fencing;
3837 put_ldev(mdev);
3838 }
3839
3840 if (mdev->state.role == R_PRIMARY) {
3841 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3842 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3843 drbd_request_state(mdev, NS(pdsk, nps));
3844 }
3845 }
3846
3847 spin_lock_irq(&mdev->req_lock);
3848 os = mdev->state;
3849 if (os.conn >= C_UNCONNECTED) {
3850 /* Do not restart in case we are C_DISCONNECTING */
3851 ns = os;
3852 ns.conn = C_UNCONNECTED;
3853 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3854 }
3855 spin_unlock_irq(&mdev->req_lock);
3856
3857 if (os.conn == C_DISCONNECTING) {
3858 struct hlist_head *h;
3859 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3860
3861 /* we must not free the tl_hash
3862 * while application io is still on the fly */
3863 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3864
3865 spin_lock_irq(&mdev->req_lock);
3866 /* paranoia code */
3867 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3868 if (h->first)
3869 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3870 (int)(h - mdev->ee_hash), h->first);
3871 kfree(mdev->ee_hash);
3872 mdev->ee_hash = NULL;
3873 mdev->ee_hash_s = 0;
3874
3875 /* paranoia code */
3876 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3877 if (h->first)
3878 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3879 (int)(h - mdev->tl_hash), h->first);
3880 kfree(mdev->tl_hash);
3881 mdev->tl_hash = NULL;
3882 mdev->tl_hash_s = 0;
3883 spin_unlock_irq(&mdev->req_lock);
3884
3885 crypto_free_hash(mdev->cram_hmac_tfm);
3886 mdev->cram_hmac_tfm = NULL;
3887
3888 kfree(mdev->net_conf);
3889 mdev->net_conf = NULL;
3890 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3891 }
3892
3893 /* tcp_close and release of sendpage pages can be deferred. I don't
3894 * want to use SO_LINGER, because apparently it can be deferred for
3895 * more than 20 seconds (longest time I checked).
3896 *
3897 * Actually we don't care for exactly when the network stack does its
3898 * put_page(), but release our reference on these pages right here.
3899 */
3900 i = drbd_release_ee(mdev, &mdev->net_ee);
3901 if (i)
3902 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3903 i = atomic_read(&mdev->pp_in_use);
3904 if (i)
Lars Ellenberg45bb9122010-05-14 17:10:48 +02003905 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003906
3907 D_ASSERT(list_empty(&mdev->read_ee));
3908 D_ASSERT(list_empty(&mdev->active_ee));
3909 D_ASSERT(list_empty(&mdev->sync_ee));
3910 D_ASSERT(list_empty(&mdev->done_ee));
3911
3912 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3913 atomic_set(&mdev->current_epoch->epoch_size, 0);
3914 D_ASSERT(list_empty(&mdev->current_epoch->list));
3915}
3916
3917/*
3918 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3919 * we can agree on is stored in agreed_pro_version.
3920 *
3921 * feature flags and the reserved array should be enough room for future
3922 * enhancements of the handshake protocol, and possible plugins...
3923 *
3924 * for now, they are expected to be zero, but ignored.
3925 */
3926static int drbd_send_handshake(struct drbd_conf *mdev)
3927{
3928 /* ASSERT current == mdev->receiver ... */
3929 struct p_handshake *p = &mdev->data.sbuf.handshake;
3930 int ok;
3931
3932 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3933 dev_err(DEV, "interrupted during initial handshake\n");
3934 return 0; /* interrupted. not ok. */
3935 }
3936
3937 if (mdev->data.socket == NULL) {
3938 mutex_unlock(&mdev->data.mutex);
3939 return 0;
3940 }
3941
3942 memset(p, 0, sizeof(*p));
3943 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3944 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3945 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3946 (struct p_header *)p, sizeof(*p), 0 );
3947 mutex_unlock(&mdev->data.mutex);
3948 return ok;
3949}
3950
3951/*
3952 * return values:
3953 * 1 yes, we have a valid connection
3954 * 0 oops, did not work out, please try again
3955 * -1 peer talks different language,
3956 * no point in trying again, please go standalone.
3957 */
3958static int drbd_do_handshake(struct drbd_conf *mdev)
3959{
3960 /* ASSERT current == mdev->receiver ... */
3961 struct p_handshake *p = &mdev->data.rbuf.handshake;
3962 const int expect = sizeof(struct p_handshake)
3963 -sizeof(struct p_header);
3964 int rv;
3965
3966 rv = drbd_send_handshake(mdev);
3967 if (!rv)
3968 return 0;
3969
3970 rv = drbd_recv_header(mdev, &p->head);
3971 if (!rv)
3972 return 0;
3973
3974 if (p->head.command != P_HAND_SHAKE) {
3975 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3976 cmdname(p->head.command), p->head.command);
3977 return -1;
3978 }
3979
3980 if (p->head.length != expect) {
3981 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3982 expect, p->head.length);
3983 return -1;
3984 }
3985
3986 rv = drbd_recv(mdev, &p->head.payload, expect);
3987
3988 if (rv != expect) {
3989 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3990 return 0;
3991 }
3992
Philipp Reisnerb411b362009-09-25 16:07:19 -07003993 p->protocol_min = be32_to_cpu(p->protocol_min);
3994 p->protocol_max = be32_to_cpu(p->protocol_max);
3995 if (p->protocol_max == 0)
3996 p->protocol_max = p->protocol_min;
3997
3998 if (PRO_VERSION_MAX < p->protocol_min ||
3999 PRO_VERSION_MIN > p->protocol_max)
4000 goto incompat;
4001
4002 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4003
4004 dev_info(DEV, "Handshake successful: "
4005 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4006
4007 return 1;
4008
4009 incompat:
4010 dev_err(DEV, "incompatible DRBD dialects: "
4011 "I support %d-%d, peer supports %d-%d\n",
4012 PRO_VERSION_MIN, PRO_VERSION_MAX,
4013 p->protocol_min, p->protocol_max);
4014 return -1;
4015}
4016
4017#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4018static int drbd_do_auth(struct drbd_conf *mdev)
4019{
4020 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4021 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004022 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004023}
4024#else
4025#define CHALLENGE_LEN 64
Johannes Thomab10d96c2010-01-07 16:02:50 +01004026
4027/* Return value:
4028 1 - auth succeeded,
4029 0 - failed, try again (network error),
4030 -1 - auth failed, don't try again.
4031*/
4032
Philipp Reisnerb411b362009-09-25 16:07:19 -07004033static int drbd_do_auth(struct drbd_conf *mdev)
4034{
4035 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4036 struct scatterlist sg;
4037 char *response = NULL;
4038 char *right_response = NULL;
4039 char *peers_ch = NULL;
4040 struct p_header p;
4041 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4042 unsigned int resp_size;
4043 struct hash_desc desc;
4044 int rv;
4045
4046 desc.tfm = mdev->cram_hmac_tfm;
4047 desc.flags = 0;
4048
4049 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4050 (u8 *)mdev->net_conf->shared_secret, key_len);
4051 if (rv) {
4052 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004053 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004054 goto fail;
4055 }
4056
4057 get_random_bytes(my_challenge, CHALLENGE_LEN);
4058
4059 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4060 if (!rv)
4061 goto fail;
4062
4063 rv = drbd_recv_header(mdev, &p);
4064 if (!rv)
4065 goto fail;
4066
4067 if (p.command != P_AUTH_CHALLENGE) {
4068 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4069 cmdname(p.command), p.command);
4070 rv = 0;
4071 goto fail;
4072 }
4073
4074 if (p.length > CHALLENGE_LEN*2) {
4075 dev_err(DEV, "expected AuthChallenge payload too big.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004076 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004077 goto fail;
4078 }
4079
4080 peers_ch = kmalloc(p.length, GFP_NOIO);
4081 if (peers_ch == NULL) {
4082 dev_err(DEV, "kmalloc of peers_ch failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004083 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004084 goto fail;
4085 }
4086
4087 rv = drbd_recv(mdev, peers_ch, p.length);
4088
4089 if (rv != p.length) {
4090 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4091 rv = 0;
4092 goto fail;
4093 }
4094
4095 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4096 response = kmalloc(resp_size, GFP_NOIO);
4097 if (response == NULL) {
4098 dev_err(DEV, "kmalloc of response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004099 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004100 goto fail;
4101 }
4102
4103 sg_init_table(&sg, 1);
4104 sg_set_buf(&sg, peers_ch, p.length);
4105
4106 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4107 if (rv) {
4108 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004109 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004110 goto fail;
4111 }
4112
4113 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4114 if (!rv)
4115 goto fail;
4116
4117 rv = drbd_recv_header(mdev, &p);
4118 if (!rv)
4119 goto fail;
4120
4121 if (p.command != P_AUTH_RESPONSE) {
4122 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4123 cmdname(p.command), p.command);
4124 rv = 0;
4125 goto fail;
4126 }
4127
4128 if (p.length != resp_size) {
4129 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4130 rv = 0;
4131 goto fail;
4132 }
4133
4134 rv = drbd_recv(mdev, response , resp_size);
4135
4136 if (rv != resp_size) {
4137 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4138 rv = 0;
4139 goto fail;
4140 }
4141
4142 right_response = kmalloc(resp_size, GFP_NOIO);
Julia Lawall2d1ee872009-12-27 22:27:11 +01004143 if (right_response == NULL) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07004144 dev_err(DEV, "kmalloc of right_response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004145 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004146 goto fail;
4147 }
4148
4149 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4150
4151 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4152 if (rv) {
4153 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004154 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004155 goto fail;
4156 }
4157
4158 rv = !memcmp(response, right_response, resp_size);
4159
4160 if (rv)
4161 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4162 resp_size, mdev->net_conf->cram_hmac_alg);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004163 else
4164 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004165
4166 fail:
4167 kfree(peers_ch);
4168 kfree(response);
4169 kfree(right_response);
4170
4171 return rv;
4172}
4173#endif
4174
4175int drbdd_init(struct drbd_thread *thi)
4176{
4177 struct drbd_conf *mdev = thi->mdev;
4178 unsigned int minor = mdev_to_minor(mdev);
4179 int h;
4180
4181 sprintf(current->comm, "drbd%d_receiver", minor);
4182
4183 dev_info(DEV, "receiver (re)started\n");
4184
4185 do {
4186 h = drbd_connect(mdev);
4187 if (h == 0) {
4188 drbd_disconnect(mdev);
4189 __set_current_state(TASK_INTERRUPTIBLE);
4190 schedule_timeout(HZ);
4191 }
4192 if (h == -1) {
4193 dev_warn(DEV, "Discarding network configuration.\n");
4194 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4195 }
4196 } while (h == 0);
4197
4198 if (h > 0) {
4199 if (get_net_conf(mdev)) {
4200 drbdd(mdev);
4201 put_net_conf(mdev);
4202 }
4203 }
4204
4205 drbd_disconnect(mdev);
4206
4207 dev_info(DEV, "receiver terminated\n");
4208 return 0;
4209}
4210
4211/* ********* acknowledge sender ******** */
4212
4213static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4214{
4215 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4216
4217 int retcode = be32_to_cpu(p->retcode);
4218
4219 if (retcode >= SS_SUCCESS) {
4220 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4221 } else {
4222 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4223 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4224 drbd_set_st_err_str(retcode), retcode);
4225 }
4226 wake_up(&mdev->state_wait);
4227
4228 return TRUE;
4229}
4230
4231static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4232{
4233 return drbd_send_ping_ack(mdev);
4234
4235}
4236
4237static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4238{
4239 /* restore idle timeout */
4240 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
Philipp Reisner309d1602010-03-02 15:03:44 +01004241 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4242 wake_up(&mdev->misc_wait);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004243
4244 return TRUE;
4245}
4246
4247static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4248{
4249 struct p_block_ack *p = (struct p_block_ack *)h;
4250 sector_t sector = be64_to_cpu(p->sector);
4251 int blksize = be32_to_cpu(p->blksize);
4252
4253 D_ASSERT(mdev->agreed_pro_version >= 89);
4254
4255 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4256
4257 drbd_rs_complete_io(mdev, sector);
4258 drbd_set_in_sync(mdev, sector, blksize);
4259 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4260 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4261 dec_rs_pending(mdev);
4262
4263 return TRUE;
4264}
4265
4266/* when we receive the ACK for a write request,
4267 * verify that we actually know about it */
4268static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4269 u64 id, sector_t sector)
4270{
4271 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4272 struct hlist_node *n;
4273 struct drbd_request *req;
4274
4275 hlist_for_each_entry(req, n, slot, colision) {
4276 if ((unsigned long)req == (unsigned long)id) {
4277 if (req->sector != sector) {
4278 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4279 "wrong sector (%llus versus %llus)\n", req,
4280 (unsigned long long)req->sector,
4281 (unsigned long long)sector);
4282 break;
4283 }
4284 return req;
4285 }
4286 }
4287 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4288 (void *)(unsigned long)id, (unsigned long long)sector);
4289 return NULL;
4290}
4291
4292typedef struct drbd_request *(req_validator_fn)
4293 (struct drbd_conf *mdev, u64 id, sector_t sector);
4294
4295static int validate_req_change_req_state(struct drbd_conf *mdev,
4296 u64 id, sector_t sector, req_validator_fn validator,
4297 const char *func, enum drbd_req_event what)
4298{
4299 struct drbd_request *req;
4300 struct bio_and_error m;
4301
4302 spin_lock_irq(&mdev->req_lock);
4303 req = validator(mdev, id, sector);
4304 if (unlikely(!req)) {
4305 spin_unlock_irq(&mdev->req_lock);
4306 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4307 return FALSE;
4308 }
4309 __req_mod(req, what, &m);
4310 spin_unlock_irq(&mdev->req_lock);
4311
4312 if (m.bio)
4313 complete_master_bio(mdev, &m);
4314 return TRUE;
4315}
4316
4317static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4318{
4319 struct p_block_ack *p = (struct p_block_ack *)h;
4320 sector_t sector = be64_to_cpu(p->sector);
4321 int blksize = be32_to_cpu(p->blksize);
4322 enum drbd_req_event what;
4323
4324 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4325
4326 if (is_syncer_block_id(p->block_id)) {
4327 drbd_set_in_sync(mdev, sector, blksize);
4328 dec_rs_pending(mdev);
4329 return TRUE;
4330 }
4331 switch (be16_to_cpu(h->command)) {
4332 case P_RS_WRITE_ACK:
4333 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4334 what = write_acked_by_peer_and_sis;
4335 break;
4336 case P_WRITE_ACK:
4337 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4338 what = write_acked_by_peer;
4339 break;
4340 case P_RECV_ACK:
4341 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4342 what = recv_acked_by_peer;
4343 break;
4344 case P_DISCARD_ACK:
4345 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4346 what = conflict_discarded_by_peer;
4347 break;
4348 default:
4349 D_ASSERT(0);
4350 return FALSE;
4351 }
4352
4353 return validate_req_change_req_state(mdev, p->block_id, sector,
4354 _ack_id_to_req, __func__ , what);
4355}
4356
4357static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4358{
4359 struct p_block_ack *p = (struct p_block_ack *)h;
4360 sector_t sector = be64_to_cpu(p->sector);
4361
4362 if (__ratelimit(&drbd_ratelimit_state))
4363 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4364
4365 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4366
4367 if (is_syncer_block_id(p->block_id)) {
4368 int size = be32_to_cpu(p->blksize);
4369 dec_rs_pending(mdev);
4370 drbd_rs_failed_io(mdev, sector, size);
4371 return TRUE;
4372 }
4373 return validate_req_change_req_state(mdev, p->block_id, sector,
4374 _ack_id_to_req, __func__ , neg_acked);
4375}
4376
4377static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4378{
4379 struct p_block_ack *p = (struct p_block_ack *)h;
4380 sector_t sector = be64_to_cpu(p->sector);
4381
4382 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4383 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4384 (unsigned long long)sector, be32_to_cpu(p->blksize));
4385
4386 return validate_req_change_req_state(mdev, p->block_id, sector,
4387 _ar_id_to_req, __func__ , neg_acked);
4388}
4389
4390static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4391{
4392 sector_t sector;
4393 int size;
4394 struct p_block_ack *p = (struct p_block_ack *)h;
4395
4396 sector = be64_to_cpu(p->sector);
4397 size = be32_to_cpu(p->blksize);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004398
4399 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4400
4401 dec_rs_pending(mdev);
4402
4403 if (get_ldev_if_state(mdev, D_FAILED)) {
4404 drbd_rs_complete_io(mdev, sector);
4405 drbd_rs_failed_io(mdev, sector, size);
4406 put_ldev(mdev);
4407 }
4408
4409 return TRUE;
4410}
4411
4412static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4413{
4414 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4415
4416 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4417
4418 return TRUE;
4419}
4420
4421static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4422{
4423 struct p_block_ack *p = (struct p_block_ack *)h;
4424 struct drbd_work *w;
4425 sector_t sector;
4426 int size;
4427
4428 sector = be64_to_cpu(p->sector);
4429 size = be32_to_cpu(p->blksize);
4430
4431 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4432
4433 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4434 drbd_ov_oos_found(mdev, sector, size);
4435 else
4436 ov_oos_print(mdev);
4437
4438 drbd_rs_complete_io(mdev, sector);
4439 dec_rs_pending(mdev);
4440
4441 if (--mdev->ov_left == 0) {
4442 w = kmalloc(sizeof(*w), GFP_NOIO);
4443 if (w) {
4444 w->cb = w_ov_finished;
4445 drbd_queue_work_front(&mdev->data.work, w);
4446 } else {
4447 dev_err(DEV, "kmalloc(w) failed.");
4448 ov_oos_print(mdev);
4449 drbd_resync_finished(mdev);
4450 }
4451 }
4452 return TRUE;
4453}
4454
Philipp Reisner0ced55a2010-04-30 15:26:20 +02004455static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4456{
4457 struct p_delay_probe *p = (struct p_delay_probe *)h;
4458
4459 got_delay_probe(mdev, USE_META_SOCKET, p);
4460 return TRUE;
4461}
4462
Philipp Reisnerb411b362009-09-25 16:07:19 -07004463struct asender_cmd {
4464 size_t pkt_size;
4465 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4466};
4467
4468static struct asender_cmd *get_asender_cmd(int cmd)
4469{
4470 static struct asender_cmd asender_tbl[] = {
4471 /* anything missing from this table is in
4472 * the drbd_cmd_handler (drbd_default_handler) table,
4473 * see the beginning of drbdd() */
4474 [P_PING] = { sizeof(struct p_header), got_Ping },
4475 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4476 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4477 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4478 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4479 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4480 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4481 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4482 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4483 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4484 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4485 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4486 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
Philipp Reisner0ced55a2010-04-30 15:26:20 +02004487 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_delay_probe_m },
Philipp Reisnerb411b362009-09-25 16:07:19 -07004488 [P_MAX_CMD] = { 0, NULL },
4489 };
4490 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4491 return NULL;
4492 return &asender_tbl[cmd];
4493}
4494
4495int drbd_asender(struct drbd_thread *thi)
4496{
4497 struct drbd_conf *mdev = thi->mdev;
4498 struct p_header *h = &mdev->meta.rbuf.header;
4499 struct asender_cmd *cmd = NULL;
4500
4501 int rv, len;
4502 void *buf = h;
4503 int received = 0;
4504 int expect = sizeof(struct p_header);
4505 int empty;
4506
4507 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4508
4509 current->policy = SCHED_RR; /* Make this a realtime task! */
4510 current->rt_priority = 2; /* more important than all other tasks */
4511
4512 while (get_t_state(thi) == Running) {
4513 drbd_thread_current_set_cpu(mdev);
4514 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4515 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4516 mdev->meta.socket->sk->sk_rcvtimeo =
4517 mdev->net_conf->ping_timeo*HZ/10;
4518 }
4519
4520 /* conditionally cork;
4521 * it may hurt latency if we cork without much to send */
4522 if (!mdev->net_conf->no_cork &&
4523 3 < atomic_read(&mdev->unacked_cnt))
4524 drbd_tcp_cork(mdev->meta.socket);
4525 while (1) {
4526 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4527 flush_signals(current);
4528 if (!drbd_process_done_ee(mdev)) {
4529 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4530 goto reconnect;
4531 }
4532 /* to avoid race with newly queued ACKs */
4533 set_bit(SIGNAL_ASENDER, &mdev->flags);
4534 spin_lock_irq(&mdev->req_lock);
4535 empty = list_empty(&mdev->done_ee);
4536 spin_unlock_irq(&mdev->req_lock);
4537 /* new ack may have been queued right here,
4538 * but then there is also a signal pending,
4539 * and we start over... */
4540 if (empty)
4541 break;
4542 }
4543 /* but unconditionally uncork unless disabled */
4544 if (!mdev->net_conf->no_cork)
4545 drbd_tcp_uncork(mdev->meta.socket);
4546
4547 /* short circuit, recv_msg would return EINTR anyways. */
4548 if (signal_pending(current))
4549 continue;
4550
4551 rv = drbd_recv_short(mdev, mdev->meta.socket,
4552 buf, expect-received, 0);
4553 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4554
4555 flush_signals(current);
4556
4557 /* Note:
4558 * -EINTR (on meta) we got a signal
4559 * -EAGAIN (on meta) rcvtimeo expired
4560 * -ECONNRESET other side closed the connection
4561 * -ERESTARTSYS (on data) we got a signal
4562 * rv < 0 other than above: unexpected error!
4563 * rv == expected: full header or command
4564 * rv < expected: "woken" by signal during receive
4565 * rv == 0 : "connection shut down by peer"
4566 */
4567 if (likely(rv > 0)) {
4568 received += rv;
4569 buf += rv;
4570 } else if (rv == 0) {
4571 dev_err(DEV, "meta connection shut down by peer.\n");
4572 goto reconnect;
4573 } else if (rv == -EAGAIN) {
4574 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4575 mdev->net_conf->ping_timeo*HZ/10) {
4576 dev_err(DEV, "PingAck did not arrive in time.\n");
4577 goto reconnect;
4578 }
4579 set_bit(SEND_PING, &mdev->flags);
4580 continue;
4581 } else if (rv == -EINTR) {
4582 continue;
4583 } else {
4584 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4585 goto reconnect;
4586 }
4587
4588 if (received == expect && cmd == NULL) {
4589 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4590 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4591 (long)be32_to_cpu(h->magic),
4592 h->command, h->length);
4593 goto reconnect;
4594 }
4595 cmd = get_asender_cmd(be16_to_cpu(h->command));
4596 len = be16_to_cpu(h->length);
4597 if (unlikely(cmd == NULL)) {
4598 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4599 (long)be32_to_cpu(h->magic),
4600 h->command, h->length);
4601 goto disconnect;
4602 }
4603 expect = cmd->pkt_size;
Jens Axboe6a0afdf2009-10-01 09:04:14 +02004604 ERR_IF(len != expect-sizeof(struct p_header))
Philipp Reisnerb411b362009-09-25 16:07:19 -07004605 goto reconnect;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004606 }
4607 if (received == expect) {
4608 D_ASSERT(cmd != NULL);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004609 if (!cmd->process(mdev, h))
4610 goto reconnect;
4611
4612 buf = h;
4613 received = 0;
4614 expect = sizeof(struct p_header);
4615 cmd = NULL;
4616 }
4617 }
4618
4619 if (0) {
4620reconnect:
4621 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4622 }
4623 if (0) {
4624disconnect:
4625 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4626 }
4627 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4628
4629 D_ASSERT(mdev->state.conn < C_CONNECTED);
4630 dev_info(DEV, "asender terminated\n");
4631
4632 return 0;
4633}