blob: 2f9320be4906df4ce713d8da86cc4a136a4770e6 [file] [log] [blame]
Philipp Reisnerb411b362009-09-25 16:07:19 -07001/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
Philipp Reisnerb411b362009-09-25 16:07:19 -070026#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
Philipp Reisnerb411b362009-09-25 16:07:19 -070031#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
Philipp Reisnerb411b362009-09-25 16:07:19 -070045#include <linux/string.h>
46#include <linux/scatterlist.h>
47#include "drbd_int.h"
Philipp Reisnerb411b362009-09-25 16:07:19 -070048#include "drbd_req.h"
49
50#include "drbd_vli.h"
51
52struct flush_work {
53 struct drbd_work w;
54 struct drbd_epoch *epoch;
55};
56
57enum finish_epoch {
58 FE_STILL_LIVE,
59 FE_DESTROYED,
60 FE_RECYCLED,
61};
62
63static int drbd_do_handshake(struct drbd_conf *mdev);
64static int drbd_do_auth(struct drbd_conf *mdev);
65
66static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
67static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
68
69static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
70{
71 struct drbd_epoch *prev;
72 spin_lock(&mdev->epoch_lock);
73 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
74 if (prev == epoch || prev == mdev->current_epoch)
75 prev = NULL;
76 spin_unlock(&mdev->epoch_lock);
77 return prev;
78}
79
80#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
81
Lars Ellenberg45bb9122010-05-14 17:10:48 +020082/*
83 * some helper functions to deal with single linked page lists,
84 * page->private being our "next" pointer.
85 */
86
87/* If at least n pages are linked at head, get n pages off.
88 * Otherwise, don't modify head, and return NULL.
89 * Locking is the responsibility of the caller.
90 */
91static struct page *page_chain_del(struct page **head, int n)
92{
93 struct page *page;
94 struct page *tmp;
95
96 BUG_ON(!n);
97 BUG_ON(!head);
98
99 page = *head;
Philipp Reisner23ce4222010-05-20 13:35:31 +0200100
101 if (!page)
102 return NULL;
103
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200104 while (page) {
105 tmp = page_chain_next(page);
106 if (--n == 0)
107 break; /* found sufficient pages */
108 if (tmp == NULL)
109 /* insufficient pages, don't use any of them. */
110 return NULL;
111 page = tmp;
112 }
113
114 /* add end of list marker for the returned list */
115 set_page_private(page, 0);
116 /* actual return value, and adjustment of head */
117 page = *head;
118 *head = tmp;
119 return page;
120}
121
122/* may be used outside of locks to find the tail of a (usually short)
123 * "private" page chain, before adding it back to a global chain head
124 * with page_chain_add() under a spinlock. */
125static struct page *page_chain_tail(struct page *page, int *len)
126{
127 struct page *tmp;
128 int i = 1;
129 while ((tmp = page_chain_next(page)))
130 ++i, page = tmp;
131 if (len)
132 *len = i;
133 return page;
134}
135
136static int page_chain_free(struct page *page)
137{
138 struct page *tmp;
139 int i = 0;
140 page_chain_for_each_safe(page, tmp) {
141 put_page(page);
142 ++i;
143 }
144 return i;
145}
146
147static void page_chain_add(struct page **head,
148 struct page *chain_first, struct page *chain_last)
149{
150#if 1
151 struct page *tmp;
152 tmp = page_chain_tail(chain_first, NULL);
153 BUG_ON(tmp != chain_last);
154#endif
155
156 /* add chain to head */
157 set_page_private(chain_last, (unsigned long)*head);
158 *head = chain_first;
159}
160
161static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700162{
163 struct page *page = NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200164 struct page *tmp = NULL;
165 int i = 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700166
167 /* Yes, testing drbd_pp_vacant outside the lock is racy.
168 * So what. It saves a spin_lock. */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200169 if (drbd_pp_vacant >= number) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700170 spin_lock(&drbd_pp_lock);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200171 page = page_chain_del(&drbd_pp_pool, number);
172 if (page)
173 drbd_pp_vacant -= number;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700174 spin_unlock(&drbd_pp_lock);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200175 if (page)
176 return page;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700177 }
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200178
Philipp Reisnerb411b362009-09-25 16:07:19 -0700179 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
180 * "criss-cross" setup, that might cause write-out on some other DRBD,
181 * which in turn might block on the other node at this very place. */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200182 for (i = 0; i < number; i++) {
183 tmp = alloc_page(GFP_TRY);
184 if (!tmp)
185 break;
186 set_page_private(tmp, (unsigned long)page);
187 page = tmp;
188 }
189
190 if (i == number)
191 return page;
192
193 /* Not enough pages immediately available this time.
194 * No need to jump around here, drbd_pp_alloc will retry this
195 * function "soon". */
196 if (page) {
197 tmp = page_chain_tail(page, NULL);
198 spin_lock(&drbd_pp_lock);
199 page_chain_add(&drbd_pp_pool, page, tmp);
200 drbd_pp_vacant += i;
201 spin_unlock(&drbd_pp_lock);
202 }
203 return NULL;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700204}
205
206/* kick lower level device, if we have more than (arbitrary number)
207 * reference counts on it, which typically are locally submitted io
208 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
209static void maybe_kick_lo(struct drbd_conf *mdev)
210{
211 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
212 drbd_kick_lo(mdev);
213}
214
215static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
216{
217 struct drbd_epoch_entry *e;
218 struct list_head *le, *tle;
219
220 /* The EEs are always appended to the end of the list. Since
221 they are sent in order over the wire, they have to finish
222 in order. As soon as we see the first not finished we can
223 stop to examine the list... */
224
225 list_for_each_safe(le, tle, &mdev->net_ee) {
226 e = list_entry(le, struct drbd_epoch_entry, w.list);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200227 if (drbd_ee_has_active_page(e))
Philipp Reisnerb411b362009-09-25 16:07:19 -0700228 break;
229 list_move(le, to_be_freed);
230 }
231}
232
233static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
234{
235 LIST_HEAD(reclaimed);
236 struct drbd_epoch_entry *e, *t;
237
238 maybe_kick_lo(mdev);
239 spin_lock_irq(&mdev->req_lock);
240 reclaim_net_ee(mdev, &reclaimed);
241 spin_unlock_irq(&mdev->req_lock);
242
243 list_for_each_entry_safe(e, t, &reclaimed, w.list)
244 drbd_free_ee(mdev, e);
245}
246
247/**
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200248 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700249 * @mdev: DRBD device.
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200250 * @number: number of pages requested
251 * @retry: whether to retry, if not enough pages are available right now
Philipp Reisnerb411b362009-09-25 16:07:19 -0700252 *
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200253 * Tries to allocate number pages, first from our own page pool, then from
254 * the kernel, unless this allocation would exceed the max_buffers setting.
255 * Possibly retry until DRBD frees sufficient pages somewhere else.
256 *
257 * Returns a page chain linked via page->private.
Philipp Reisnerb411b362009-09-25 16:07:19 -0700258 */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200259static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700260{
261 struct page *page = NULL;
262 DEFINE_WAIT(wait);
263
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200264 /* Yes, we may run up to @number over max_buffers. If we
265 * follow it strictly, the admin will get it wrong anyways. */
266 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
267 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700268
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200269 while (page == NULL) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700270 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271
272 drbd_kick_lo_and_reclaim_net(mdev);
273
274 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200275 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700276 if (page)
277 break;
278 }
279
280 if (!retry)
281 break;
282
283 if (signal_pending(current)) {
284 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
285 break;
286 }
287
288 schedule();
289 }
290 finish_wait(&drbd_pp_wait, &wait);
291
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200292 if (page)
293 atomic_add(number, &mdev->pp_in_use);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700294 return page;
295}
296
297/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200298 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
299 * Either links the page chain back to the global pool,
300 * or returns all pages to the system. */
Philipp Reisnerb411b362009-09-25 16:07:19 -0700301static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
302{
Philipp Reisnerb411b362009-09-25 16:07:19 -0700303 int i;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200304 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
305 i = page_chain_free(page);
306 else {
307 struct page *tmp;
308 tmp = page_chain_tail(page, &i);
309 spin_lock(&drbd_pp_lock);
310 page_chain_add(&drbd_pp_pool, page, tmp);
311 drbd_pp_vacant += i;
312 spin_unlock(&drbd_pp_lock);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700313 }
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200314 atomic_sub(i, &mdev->pp_in_use);
315 i = atomic_read(&mdev->pp_in_use);
316 if (i < 0)
317 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700318 wake_up(&drbd_pp_wait);
319}
320
321/*
322You need to hold the req_lock:
323 _drbd_wait_ee_list_empty()
324
325You must not have the req_lock:
326 drbd_free_ee()
327 drbd_alloc_ee()
328 drbd_init_ee()
329 drbd_release_ee()
330 drbd_ee_fix_bhs()
331 drbd_process_done_ee()
332 drbd_clear_done_ee()
333 drbd_wait_ee_list_empty()
334*/
335
336struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
337 u64 id,
338 sector_t sector,
339 unsigned int data_size,
340 gfp_t gfp_mask) __must_hold(local)
341{
Philipp Reisnerb411b362009-09-25 16:07:19 -0700342 struct drbd_epoch_entry *e;
343 struct page *page;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200344 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700345
346 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
347 return NULL;
348
349 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
350 if (!e) {
351 if (!(gfp_mask & __GFP_NOWARN))
352 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
353 return NULL;
354 }
355
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200356 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
357 if (!page)
358 goto fail;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700359
Philipp Reisnerb411b362009-09-25 16:07:19 -0700360 INIT_HLIST_NODE(&e->colision);
361 e->epoch = NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200362 e->mdev = mdev;
363 e->pages = page;
364 atomic_set(&e->pending_bios, 0);
365 e->size = data_size;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700366 e->flags = 0;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200367 e->sector = sector;
368 e->sector = sector;
369 e->block_id = id;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700370
Philipp Reisnerb411b362009-09-25 16:07:19 -0700371 return e;
372
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200373 fail:
Philipp Reisnerb411b362009-09-25 16:07:19 -0700374 mempool_free(e, drbd_ee_mempool);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700375 return NULL;
376}
377
378void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
379{
Lars Ellenbergc36c3ce2010-08-11 20:42:55 +0200380 if (e->flags & EE_HAS_DIGEST)
381 kfree(e->digest);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200382 drbd_pp_free(mdev, e->pages);
383 D_ASSERT(atomic_read(&e->pending_bios) == 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700384 D_ASSERT(hlist_unhashed(&e->colision));
385 mempool_free(e, drbd_ee_mempool);
386}
387
388int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
389{
390 LIST_HEAD(work_list);
391 struct drbd_epoch_entry *e, *t;
392 int count = 0;
393
394 spin_lock_irq(&mdev->req_lock);
395 list_splice_init(list, &work_list);
396 spin_unlock_irq(&mdev->req_lock);
397
398 list_for_each_entry_safe(e, t, &work_list, w.list) {
399 drbd_free_ee(mdev, e);
400 count++;
401 }
402 return count;
403}
404
405
406/*
407 * This function is called from _asender only_
408 * but see also comments in _req_mod(,barrier_acked)
409 * and receive_Barrier.
410 *
411 * Move entries from net_ee to done_ee, if ready.
412 * Grab done_ee, call all callbacks, free the entries.
413 * The callbacks typically send out ACKs.
414 */
415static int drbd_process_done_ee(struct drbd_conf *mdev)
416{
417 LIST_HEAD(work_list);
418 LIST_HEAD(reclaimed);
419 struct drbd_epoch_entry *e, *t;
420 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
421
422 spin_lock_irq(&mdev->req_lock);
423 reclaim_net_ee(mdev, &reclaimed);
424 list_splice_init(&mdev->done_ee, &work_list);
425 spin_unlock_irq(&mdev->req_lock);
426
427 list_for_each_entry_safe(e, t, &reclaimed, w.list)
428 drbd_free_ee(mdev, e);
429
430 /* possible callbacks here:
431 * e_end_block, and e_end_resync_block, e_send_discard_ack.
432 * all ignore the last argument.
433 */
434 list_for_each_entry_safe(e, t, &work_list, w.list) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700435 /* list_del not necessary, next/prev members not touched */
436 ok = e->w.cb(mdev, &e->w, !ok) && ok;
437 drbd_free_ee(mdev, e);
438 }
439 wake_up(&mdev->ee_wait);
440
441 return ok;
442}
443
444void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
445{
446 DEFINE_WAIT(wait);
447
448 /* avoids spin_lock/unlock
449 * and calling prepare_to_wait in the fast path */
450 while (!list_empty(head)) {
451 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
452 spin_unlock_irq(&mdev->req_lock);
453 drbd_kick_lo(mdev);
454 schedule();
455 finish_wait(&mdev->ee_wait, &wait);
456 spin_lock_irq(&mdev->req_lock);
457 }
458}
459
460void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
461{
462 spin_lock_irq(&mdev->req_lock);
463 _drbd_wait_ee_list_empty(mdev, head);
464 spin_unlock_irq(&mdev->req_lock);
465}
466
467/* see also kernel_accept; which is only present since 2.6.18.
468 * also we want to log which part of it failed, exactly */
469static int drbd_accept(struct drbd_conf *mdev, const char **what,
470 struct socket *sock, struct socket **newsock)
471{
472 struct sock *sk = sock->sk;
473 int err = 0;
474
475 *what = "listen";
476 err = sock->ops->listen(sock, 5);
477 if (err < 0)
478 goto out;
479
480 *what = "sock_create_lite";
481 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
482 newsock);
483 if (err < 0)
484 goto out;
485
486 *what = "accept";
487 err = sock->ops->accept(sock, *newsock, 0);
488 if (err < 0) {
489 sock_release(*newsock);
490 *newsock = NULL;
491 goto out;
492 }
493 (*newsock)->ops = sock->ops;
494
495out:
496 return err;
497}
498
499static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
500 void *buf, size_t size, int flags)
501{
502 mm_segment_t oldfs;
503 struct kvec iov = {
504 .iov_base = buf,
505 .iov_len = size,
506 };
507 struct msghdr msg = {
508 .msg_iovlen = 1,
509 .msg_iov = (struct iovec *)&iov,
510 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
511 };
512 int rv;
513
514 oldfs = get_fs();
515 set_fs(KERNEL_DS);
516 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
517 set_fs(oldfs);
518
519 return rv;
520}
521
522static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
523{
524 mm_segment_t oldfs;
525 struct kvec iov = {
526 .iov_base = buf,
527 .iov_len = size,
528 };
529 struct msghdr msg = {
530 .msg_iovlen = 1,
531 .msg_iov = (struct iovec *)&iov,
532 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
533 };
534 int rv;
535
536 oldfs = get_fs();
537 set_fs(KERNEL_DS);
538
539 for (;;) {
540 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
541 if (rv == size)
542 break;
543
544 /* Note:
545 * ECONNRESET other side closed the connection
546 * ERESTARTSYS (on sock) we got a signal
547 */
548
549 if (rv < 0) {
550 if (rv == -ECONNRESET)
551 dev_info(DEV, "sock was reset by peer\n");
552 else if (rv != -ERESTARTSYS)
553 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
554 break;
555 } else if (rv == 0) {
556 dev_info(DEV, "sock was shut down by peer\n");
557 break;
558 } else {
559 /* signal came in, or peer/link went down,
560 * after we read a partial message
561 */
562 /* D_ASSERT(signal_pending(current)); */
563 break;
564 }
565 };
566
567 set_fs(oldfs);
568
569 if (rv != size)
570 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
571
572 return rv;
573}
574
Lars Ellenberg5dbf1672010-05-25 16:18:01 +0200575/* quoting tcp(7):
576 * On individual connections, the socket buffer size must be set prior to the
577 * listen(2) or connect(2) calls in order to have it take effect.
578 * This is our wrapper to do so.
579 */
580static void drbd_setbufsize(struct socket *sock, unsigned int snd,
581 unsigned int rcv)
582{
583 /* open coded SO_SNDBUF, SO_RCVBUF */
584 if (snd) {
585 sock->sk->sk_sndbuf = snd;
586 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
587 }
588 if (rcv) {
589 sock->sk->sk_rcvbuf = rcv;
590 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
591 }
592}
593
Philipp Reisnerb411b362009-09-25 16:07:19 -0700594static struct socket *drbd_try_connect(struct drbd_conf *mdev)
595{
596 const char *what;
597 struct socket *sock;
598 struct sockaddr_in6 src_in6;
599 int err;
600 int disconnect_on_error = 1;
601
602 if (!get_net_conf(mdev))
603 return NULL;
604
605 what = "sock_create_kern";
606 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
607 SOCK_STREAM, IPPROTO_TCP, &sock);
608 if (err < 0) {
609 sock = NULL;
610 goto out;
611 }
612
613 sock->sk->sk_rcvtimeo =
614 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
Lars Ellenberg5dbf1672010-05-25 16:18:01 +0200615 drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
616 mdev->net_conf->rcvbuf_size);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700617
618 /* explicitly bind to the configured IP as source IP
619 * for the outgoing connections.
620 * This is needed for multihomed hosts and to be
621 * able to use lo: interfaces for drbd.
622 * Make sure to use 0 as port number, so linux selects
623 * a free one dynamically.
624 */
625 memcpy(&src_in6, mdev->net_conf->my_addr,
626 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
627 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
628 src_in6.sin6_port = 0;
629 else
630 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
631
632 what = "bind before connect";
633 err = sock->ops->bind(sock,
634 (struct sockaddr *) &src_in6,
635 mdev->net_conf->my_addr_len);
636 if (err < 0)
637 goto out;
638
639 /* connect may fail, peer not yet available.
640 * stay C_WF_CONNECTION, don't go Disconnecting! */
641 disconnect_on_error = 0;
642 what = "connect";
643 err = sock->ops->connect(sock,
644 (struct sockaddr *)mdev->net_conf->peer_addr,
645 mdev->net_conf->peer_addr_len, 0);
646
647out:
648 if (err < 0) {
649 if (sock) {
650 sock_release(sock);
651 sock = NULL;
652 }
653 switch (-err) {
654 /* timeout, busy, signal pending */
655 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
656 case EINTR: case ERESTARTSYS:
657 /* peer not (yet) available, network problem */
658 case ECONNREFUSED: case ENETUNREACH:
659 case EHOSTDOWN: case EHOSTUNREACH:
660 disconnect_on_error = 0;
661 break;
662 default:
663 dev_err(DEV, "%s failed, err = %d\n", what, err);
664 }
665 if (disconnect_on_error)
666 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
667 }
668 put_net_conf(mdev);
669 return sock;
670}
671
672static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
673{
674 int timeo, err;
675 struct socket *s_estab = NULL, *s_listen;
676 const char *what;
677
678 if (!get_net_conf(mdev))
679 return NULL;
680
681 what = "sock_create_kern";
682 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
683 SOCK_STREAM, IPPROTO_TCP, &s_listen);
684 if (err) {
685 s_listen = NULL;
686 goto out;
687 }
688
689 timeo = mdev->net_conf->try_connect_int * HZ;
690 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
691
692 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
693 s_listen->sk->sk_rcvtimeo = timeo;
694 s_listen->sk->sk_sndtimeo = timeo;
Lars Ellenberg5dbf1672010-05-25 16:18:01 +0200695 drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
696 mdev->net_conf->rcvbuf_size);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700697
698 what = "bind before listen";
699 err = s_listen->ops->bind(s_listen,
700 (struct sockaddr *) mdev->net_conf->my_addr,
701 mdev->net_conf->my_addr_len);
702 if (err < 0)
703 goto out;
704
705 err = drbd_accept(mdev, &what, s_listen, &s_estab);
706
707out:
708 if (s_listen)
709 sock_release(s_listen);
710 if (err < 0) {
711 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
712 dev_err(DEV, "%s failed, err = %d\n", what, err);
713 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
714 }
715 }
716 put_net_conf(mdev);
717
718 return s_estab;
719}
720
721static int drbd_send_fp(struct drbd_conf *mdev,
722 struct socket *sock, enum drbd_packets cmd)
723{
724 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
725
726 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
727}
728
729static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
730{
731 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
732 int rr;
733
734 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
735
736 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
737 return be16_to_cpu(h->command);
738
739 return 0xffff;
740}
741
742/**
743 * drbd_socket_okay() - Free the socket if its connection is not okay
744 * @mdev: DRBD device.
745 * @sock: pointer to the pointer to the socket.
746 */
747static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
748{
749 int rr;
750 char tb[4];
751
752 if (!*sock)
753 return FALSE;
754
755 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
756
757 if (rr > 0 || rr == -EAGAIN) {
758 return TRUE;
759 } else {
760 sock_release(*sock);
761 *sock = NULL;
762 return FALSE;
763 }
764}
765
766/*
767 * return values:
768 * 1 yes, we have a valid connection
769 * 0 oops, did not work out, please try again
770 * -1 peer talks different language,
771 * no point in trying again, please go standalone.
772 * -2 We do not have a network config...
773 */
774static int drbd_connect(struct drbd_conf *mdev)
775{
776 struct socket *s, *sock, *msock;
777 int try, h, ok;
778
779 D_ASSERT(!mdev->data.socket);
780
Philipp Reisnerb411b362009-09-25 16:07:19 -0700781 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
782 return -2;
783
784 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
785
786 sock = NULL;
787 msock = NULL;
788
789 do {
790 for (try = 0;;) {
791 /* 3 tries, this should take less than a second! */
792 s = drbd_try_connect(mdev);
793 if (s || ++try >= 3)
794 break;
795 /* give the other side time to call bind() & listen() */
796 __set_current_state(TASK_INTERRUPTIBLE);
797 schedule_timeout(HZ / 10);
798 }
799
800 if (s) {
801 if (!sock) {
802 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
803 sock = s;
804 s = NULL;
805 } else if (!msock) {
806 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
807 msock = s;
808 s = NULL;
809 } else {
810 dev_err(DEV, "Logic error in drbd_connect()\n");
811 goto out_release_sockets;
812 }
813 }
814
815 if (sock && msock) {
816 __set_current_state(TASK_INTERRUPTIBLE);
817 schedule_timeout(HZ / 10);
818 ok = drbd_socket_okay(mdev, &sock);
819 ok = drbd_socket_okay(mdev, &msock) && ok;
820 if (ok)
821 break;
822 }
823
824retry:
825 s = drbd_wait_for_connect(mdev);
826 if (s) {
827 try = drbd_recv_fp(mdev, s);
828 drbd_socket_okay(mdev, &sock);
829 drbd_socket_okay(mdev, &msock);
830 switch (try) {
831 case P_HAND_SHAKE_S:
832 if (sock) {
833 dev_warn(DEV, "initial packet S crossed\n");
834 sock_release(sock);
835 }
836 sock = s;
837 break;
838 case P_HAND_SHAKE_M:
839 if (msock) {
840 dev_warn(DEV, "initial packet M crossed\n");
841 sock_release(msock);
842 }
843 msock = s;
844 set_bit(DISCARD_CONCURRENT, &mdev->flags);
845 break;
846 default:
847 dev_warn(DEV, "Error receiving initial packet\n");
848 sock_release(s);
849 if (random32() & 1)
850 goto retry;
851 }
852 }
853
854 if (mdev->state.conn <= C_DISCONNECTING)
855 goto out_release_sockets;
856 if (signal_pending(current)) {
857 flush_signals(current);
858 smp_rmb();
859 if (get_t_state(&mdev->receiver) == Exiting)
860 goto out_release_sockets;
861 }
862
863 if (sock && msock) {
864 ok = drbd_socket_okay(mdev, &sock);
865 ok = drbd_socket_okay(mdev, &msock) && ok;
866 if (ok)
867 break;
868 }
869 } while (1);
870
871 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
872 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
873
874 sock->sk->sk_allocation = GFP_NOIO;
875 msock->sk->sk_allocation = GFP_NOIO;
876
877 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
878 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
879
Philipp Reisnerb411b362009-09-25 16:07:19 -0700880 /* NOT YET ...
881 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
882 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
883 * first set it to the P_HAND_SHAKE timeout,
884 * which we set to 4x the configured ping_timeout. */
885 sock->sk->sk_sndtimeo =
886 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
887
888 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
889 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
890
891 /* we don't want delays.
892 * we use TCP_CORK where apropriate, though */
893 drbd_tcp_nodelay(sock);
894 drbd_tcp_nodelay(msock);
895
896 mdev->data.socket = sock;
897 mdev->meta.socket = msock;
898 mdev->last_received = jiffies;
899
900 D_ASSERT(mdev->asender.task == NULL);
901
902 h = drbd_do_handshake(mdev);
903 if (h <= 0)
904 return h;
905
906 if (mdev->cram_hmac_tfm) {
907 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
Johannes Thomab10d96c2010-01-07 16:02:50 +0100908 switch (drbd_do_auth(mdev)) {
909 case -1:
Philipp Reisnerb411b362009-09-25 16:07:19 -0700910 dev_err(DEV, "Authentication of peer failed\n");
911 return -1;
Johannes Thomab10d96c2010-01-07 16:02:50 +0100912 case 0:
913 dev_err(DEV, "Authentication of peer failed, trying again.\n");
914 return 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700915 }
916 }
917
918 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
919 return 0;
920
921 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
922 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
923
924 atomic_set(&mdev->packet_seq, 0);
925 mdev->peer_seq = 0;
926
927 drbd_thread_start(&mdev->asender);
928
Philipp Reisner7e2455c2010-04-22 14:50:23 +0200929 if (!drbd_send_protocol(mdev))
930 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700931 drbd_send_sync_param(mdev, &mdev->sync_conf);
Philipp Reisnere89b5912010-03-24 17:11:33 +0100932 drbd_send_sizes(mdev, 0, 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700933 drbd_send_uuids(mdev);
934 drbd_send_state(mdev);
935 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
936 clear_bit(RESIZE_PENDING, &mdev->flags);
937
938 return 1;
939
940out_release_sockets:
941 if (sock)
942 sock_release(sock);
943 if (msock)
944 sock_release(msock);
945 return -1;
946}
947
948static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
949{
950 int r;
951
952 r = drbd_recv(mdev, h, sizeof(*h));
953
954 if (unlikely(r != sizeof(*h))) {
955 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
956 return FALSE;
957 };
958 h->command = be16_to_cpu(h->command);
959 h->length = be16_to_cpu(h->length);
960 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
961 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
962 (long)be32_to_cpu(h->magic),
963 h->command, h->length);
964 return FALSE;
965 }
966 mdev->last_received = jiffies;
967
968 return TRUE;
969}
970
971static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
972{
973 int rv;
974
975 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
Dmitry Monakhovfbd9b092010-04-28 17:55:06 +0400976 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
977 NULL, BLKDEV_IFL_WAIT);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700978 if (rv) {
979 dev_err(DEV, "local disk flush failed with status %d\n", rv);
980 /* would rather check on EOPNOTSUPP, but that is not reliable.
981 * don't try again for ANY return value != 0
982 * if (rv == -EOPNOTSUPP) */
983 drbd_bump_write_ordering(mdev, WO_drain_io);
984 }
985 put_ldev(mdev);
986 }
987
988 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
989}
990
991static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
992{
993 struct flush_work *fw = (struct flush_work *)w;
994 struct drbd_epoch *epoch = fw->epoch;
995
996 kfree(w);
997
998 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
999 drbd_flush_after_epoch(mdev, epoch);
1000
1001 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
1002 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
1003
1004 return 1;
1005}
1006
1007/**
1008 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1009 * @mdev: DRBD device.
1010 * @epoch: Epoch object.
1011 * @ev: Epoch event.
1012 */
1013static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1014 struct drbd_epoch *epoch,
1015 enum epoch_event ev)
1016{
1017 int finish, epoch_size;
1018 struct drbd_epoch *next_epoch;
1019 int schedule_flush = 0;
1020 enum finish_epoch rv = FE_STILL_LIVE;
1021
1022 spin_lock(&mdev->epoch_lock);
1023 do {
1024 next_epoch = NULL;
1025 finish = 0;
1026
1027 epoch_size = atomic_read(&epoch->epoch_size);
1028
1029 switch (ev & ~EV_CLEANUP) {
1030 case EV_PUT:
1031 atomic_dec(&epoch->active);
1032 break;
1033 case EV_GOT_BARRIER_NR:
1034 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1035
1036 /* Special case: If we just switched from WO_bio_barrier to
1037 WO_bdev_flush we should not finish the current epoch */
1038 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1039 mdev->write_ordering != WO_bio_barrier &&
1040 epoch == mdev->current_epoch)
1041 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1042 break;
1043 case EV_BARRIER_DONE:
1044 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1045 break;
1046 case EV_BECAME_LAST:
1047 /* nothing to do*/
1048 break;
1049 }
1050
Philipp Reisnerb411b362009-09-25 16:07:19 -07001051 if (epoch_size != 0 &&
1052 atomic_read(&epoch->active) == 0 &&
1053 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1054 epoch->list.prev == &mdev->current_epoch->list &&
1055 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1056 /* Nearly all conditions are met to finish that epoch... */
1057 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1058 mdev->write_ordering == WO_none ||
1059 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1060 ev & EV_CLEANUP) {
1061 finish = 1;
1062 set_bit(DE_IS_FINISHING, &epoch->flags);
1063 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1064 mdev->write_ordering == WO_bio_barrier) {
1065 atomic_inc(&epoch->active);
1066 schedule_flush = 1;
1067 }
1068 }
1069 if (finish) {
1070 if (!(ev & EV_CLEANUP)) {
1071 spin_unlock(&mdev->epoch_lock);
1072 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1073 spin_lock(&mdev->epoch_lock);
1074 }
1075 dec_unacked(mdev);
1076
1077 if (mdev->current_epoch != epoch) {
1078 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1079 list_del(&epoch->list);
1080 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1081 mdev->epochs--;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001082 kfree(epoch);
1083
1084 if (rv == FE_STILL_LIVE)
1085 rv = FE_DESTROYED;
1086 } else {
1087 epoch->flags = 0;
1088 atomic_set(&epoch->epoch_size, 0);
Uwe Kleine-König698f9312010-07-02 20:41:51 +02001089 /* atomic_set(&epoch->active, 0); is already zero */
Philipp Reisnerb411b362009-09-25 16:07:19 -07001090 if (rv == FE_STILL_LIVE)
1091 rv = FE_RECYCLED;
1092 }
1093 }
1094
1095 if (!next_epoch)
1096 break;
1097
1098 epoch = next_epoch;
1099 } while (1);
1100
1101 spin_unlock(&mdev->epoch_lock);
1102
1103 if (schedule_flush) {
1104 struct flush_work *fw;
1105 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1106 if (fw) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001107 fw->w.cb = w_flush;
1108 fw->epoch = epoch;
1109 drbd_queue_work(&mdev->data.work, &fw->w);
1110 } else {
1111 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1112 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1113 /* That is not a recursion, only one level */
1114 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1115 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1116 }
1117 }
1118
1119 return rv;
1120}
1121
1122/**
1123 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1124 * @mdev: DRBD device.
1125 * @wo: Write ordering method to try.
1126 */
1127void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1128{
1129 enum write_ordering_e pwo;
1130 static char *write_ordering_str[] = {
1131 [WO_none] = "none",
1132 [WO_drain_io] = "drain",
1133 [WO_bdev_flush] = "flush",
1134 [WO_bio_barrier] = "barrier",
1135 };
1136
1137 pwo = mdev->write_ordering;
1138 wo = min(pwo, wo);
1139 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1140 wo = WO_bdev_flush;
1141 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1142 wo = WO_drain_io;
1143 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1144 wo = WO_none;
1145 mdev->write_ordering = wo;
1146 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1147 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1148}
1149
1150/**
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001151 * drbd_submit_ee()
1152 * @mdev: DRBD device.
1153 * @e: epoch entry
1154 * @rw: flag field, see bio->bi_rw
1155 */
1156/* TODO allocate from our own bio_set. */
1157int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1158 const unsigned rw, const int fault_type)
1159{
1160 struct bio *bios = NULL;
1161 struct bio *bio;
1162 struct page *page = e->pages;
1163 sector_t sector = e->sector;
1164 unsigned ds = e->size;
1165 unsigned n_bios = 0;
1166 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1167
1168 /* In most cases, we will only need one bio. But in case the lower
1169 * level restrictions happen to be different at this offset on this
1170 * side than those of the sending peer, we may need to submit the
1171 * request in more than one bio. */
1172next_bio:
1173 bio = bio_alloc(GFP_NOIO, nr_pages);
1174 if (!bio) {
1175 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1176 goto fail;
1177 }
1178 /* > e->sector, unless this is the first bio */
1179 bio->bi_sector = sector;
1180 bio->bi_bdev = mdev->ldev->backing_bdev;
1181 /* we special case some flags in the multi-bio case, see below
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001182 * (REQ_UNPLUG, REQ_HARDBARRIER) */
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001183 bio->bi_rw = rw;
1184 bio->bi_private = e;
1185 bio->bi_end_io = drbd_endio_sec;
1186
1187 bio->bi_next = bios;
1188 bios = bio;
1189 ++n_bios;
1190
1191 page_chain_for_each(page) {
1192 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1193 if (!bio_add_page(bio, page, len, 0)) {
1194 /* a single page must always be possible! */
1195 BUG_ON(bio->bi_vcnt == 0);
1196 goto next_bio;
1197 }
1198 ds -= len;
1199 sector += len >> 9;
1200 --nr_pages;
1201 }
1202 D_ASSERT(page == NULL);
1203 D_ASSERT(ds == 0);
1204
1205 atomic_set(&e->pending_bios, n_bios);
1206 do {
1207 bio = bios;
1208 bios = bios->bi_next;
1209 bio->bi_next = NULL;
1210
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001211 /* strip off REQ_UNPLUG unless it is the last bio */
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001212 if (bios)
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001213 bio->bi_rw &= ~REQ_UNPLUG;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001214
1215 drbd_generic_make_request(mdev, fault_type, bio);
1216
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001217 /* strip off REQ_HARDBARRIER,
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001218 * unless it is the first or last bio */
1219 if (bios && bios->bi_next)
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001220 bios->bi_rw &= ~REQ_HARDBARRIER;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001221 } while (bios);
1222 maybe_kick_lo(mdev);
1223 return 0;
1224
1225fail:
1226 while (bios) {
1227 bio = bios;
1228 bios = bios->bi_next;
1229 bio_put(bio);
1230 }
1231 return -ENOMEM;
1232}
1233
1234/**
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001235 * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
Philipp Reisnerb411b362009-09-25 16:07:19 -07001236 * @mdev: DRBD device.
1237 * @w: work object.
1238 * @cancel: The connection will be closed anyways (unused in this callback)
1239 */
1240int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1241{
1242 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001243 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1244 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1245 so that we can finish that epoch in drbd_may_finish_epoch().
1246 That is necessary if we already have a long chain of Epochs, before
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001247 we realize that REQ_HARDBARRIER is actually not supported */
Philipp Reisnerb411b362009-09-25 16:07:19 -07001248
1249 /* As long as the -ENOTSUPP on the barrier is reported immediately
1250 that will never trigger. If it is reported late, we will just
1251 print that warning and continue correctly for all future requests
1252 with WO_bdev_flush */
1253 if (previous_epoch(mdev, e->epoch))
1254 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1255
Philipp Reisnerb411b362009-09-25 16:07:19 -07001256 /* we still have a local reference,
1257 * get_ldev was done in receive_Data. */
Philipp Reisnerb411b362009-09-25 16:07:19 -07001258
1259 e->w.cb = e_end_block;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001260 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1261 /* drbd_submit_ee fails for one reason only:
1262 * if was not able to allocate sufficient bios.
1263 * requeue, try again later. */
1264 e->w.cb = w_e_reissue;
1265 drbd_queue_work(&mdev->data.work, &e->w);
1266 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001267 return 1;
1268}
1269
1270static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1271{
1272 int rv, issue_flush;
1273 struct p_barrier *p = (struct p_barrier *)h;
1274 struct drbd_epoch *epoch;
1275
1276 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1277
1278 rv = drbd_recv(mdev, h->payload, h->length);
1279 ERR_IF(rv != h->length) return FALSE;
1280
1281 inc_unacked(mdev);
1282
1283 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1284 drbd_kick_lo(mdev);
1285
1286 mdev->current_epoch->barrier_nr = p->barrier;
1287 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1288
1289 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1290 * the activity log, which means it would not be resynced in case the
1291 * R_PRIMARY crashes now.
1292 * Therefore we must send the barrier_ack after the barrier request was
1293 * completed. */
1294 switch (mdev->write_ordering) {
1295 case WO_bio_barrier:
1296 case WO_none:
1297 if (rv == FE_RECYCLED)
1298 return TRUE;
1299 break;
1300
1301 case WO_bdev_flush:
1302 case WO_drain_io:
Philipp Reisner367a8d72009-12-29 15:56:01 +01001303 if (rv == FE_STILL_LIVE) {
1304 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1305 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1306 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1307 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001308 if (rv == FE_RECYCLED)
1309 return TRUE;
1310
1311 /* The asender will send all the ACKs and barrier ACKs out, since
1312 all EEs moved from the active_ee to the done_ee. We need to
1313 provide a new epoch object for the EEs that come in soon */
1314 break;
1315 }
1316
1317 /* receiver context, in the writeout path of the other node.
1318 * avoid potential distributed deadlock */
1319 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1320 if (!epoch) {
1321 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
Dan Carpenterd3db7b42010-01-23 15:45:22 +03001322 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001323 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1324 if (issue_flush) {
1325 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1326 if (rv == FE_RECYCLED)
1327 return TRUE;
1328 }
1329
1330 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1331
1332 return TRUE;
1333 }
1334
1335 epoch->flags = 0;
1336 atomic_set(&epoch->epoch_size, 0);
1337 atomic_set(&epoch->active, 0);
1338
1339 spin_lock(&mdev->epoch_lock);
1340 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1341 list_add(&epoch->list, &mdev->current_epoch->list);
1342 mdev->current_epoch = epoch;
1343 mdev->epochs++;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001344 } else {
1345 /* The current_epoch got recycled while we allocated this one... */
1346 kfree(epoch);
1347 }
1348 spin_unlock(&mdev->epoch_lock);
1349
1350 return TRUE;
1351}
1352
1353/* used from receive_RSDataReply (recv_resync_read)
1354 * and from receive_Data */
1355static struct drbd_epoch_entry *
1356read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1357{
Lars Ellenberg66660322010-04-06 12:15:04 +02001358 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001359 struct drbd_epoch_entry *e;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001360 struct page *page;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001361 int dgs, ds, rr;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001362 void *dig_in = mdev->int_dig_in;
1363 void *dig_vv = mdev->int_dig_vv;
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001364 unsigned long *data;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001365
1366 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1367 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1368
1369 if (dgs) {
1370 rr = drbd_recv(mdev, dig_in, dgs);
1371 if (rr != dgs) {
1372 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1373 rr, dgs);
1374 return NULL;
1375 }
1376 }
1377
1378 data_size -= dgs;
1379
1380 ERR_IF(data_size & 0x1ff) return NULL;
1381 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1382
Lars Ellenberg66660322010-04-06 12:15:04 +02001383 /* even though we trust out peer,
1384 * we sometimes have to double check. */
1385 if (sector + (data_size>>9) > capacity) {
1386 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1387 (unsigned long long)capacity,
1388 (unsigned long long)sector, data_size);
1389 return NULL;
1390 }
1391
Philipp Reisnerb411b362009-09-25 16:07:19 -07001392 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1393 * "criss-cross" setup, that might cause write-out on some other DRBD,
1394 * which in turn might block on the other node at this very place. */
1395 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1396 if (!e)
1397 return NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001398
Philipp Reisnerb411b362009-09-25 16:07:19 -07001399 ds = data_size;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001400 page = e->pages;
1401 page_chain_for_each(page) {
1402 unsigned len = min_t(int, ds, PAGE_SIZE);
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001403 data = kmap(page);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001404 rr = drbd_recv(mdev, data, len);
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001405 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1406 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1407 data[0] = data[0] ^ (unsigned long)-1;
1408 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001409 kunmap(page);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001410 if (rr != len) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001411 drbd_free_ee(mdev, e);
1412 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001413 rr, len);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001414 return NULL;
1415 }
1416 ds -= rr;
1417 }
1418
1419 if (dgs) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001420 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001421 if (memcmp(dig_in, dig_vv, dgs)) {
1422 dev_err(DEV, "Digest integrity check FAILED.\n");
1423 drbd_bcast_ee(mdev, "digest failed",
1424 dgs, dig_in, dig_vv, e);
1425 drbd_free_ee(mdev, e);
1426 return NULL;
1427 }
1428 }
1429 mdev->recv_cnt += data_size>>9;
1430 return e;
1431}
1432
1433/* drbd_drain_block() just takes a data block
1434 * out of the socket input buffer, and discards it.
1435 */
1436static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1437{
1438 struct page *page;
1439 int rr, rv = 1;
1440 void *data;
1441
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02001442 if (!data_size)
1443 return TRUE;
1444
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001445 page = drbd_pp_alloc(mdev, 1, 1);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001446
1447 data = kmap(page);
1448 while (data_size) {
1449 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1450 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1451 rv = 0;
1452 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1453 rr, min_t(int, data_size, PAGE_SIZE));
1454 break;
1455 }
1456 data_size -= rr;
1457 }
1458 kunmap(page);
1459 drbd_pp_free(mdev, page);
1460 return rv;
1461}
1462
1463static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1464 sector_t sector, int data_size)
1465{
1466 struct bio_vec *bvec;
1467 struct bio *bio;
1468 int dgs, rr, i, expect;
1469 void *dig_in = mdev->int_dig_in;
1470 void *dig_vv = mdev->int_dig_vv;
1471
1472 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1473 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1474
1475 if (dgs) {
1476 rr = drbd_recv(mdev, dig_in, dgs);
1477 if (rr != dgs) {
1478 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1479 rr, dgs);
1480 return 0;
1481 }
1482 }
1483
1484 data_size -= dgs;
1485
1486 /* optimistically update recv_cnt. if receiving fails below,
1487 * we disconnect anyways, and counters will be reset. */
1488 mdev->recv_cnt += data_size>>9;
1489
1490 bio = req->master_bio;
1491 D_ASSERT(sector == bio->bi_sector);
1492
1493 bio_for_each_segment(bvec, bio, i) {
1494 expect = min_t(int, data_size, bvec->bv_len);
1495 rr = drbd_recv(mdev,
1496 kmap(bvec->bv_page)+bvec->bv_offset,
1497 expect);
1498 kunmap(bvec->bv_page);
1499 if (rr != expect) {
1500 dev_warn(DEV, "short read receiving data reply: "
1501 "read %d expected %d\n",
1502 rr, expect);
1503 return 0;
1504 }
1505 data_size -= rr;
1506 }
1507
1508 if (dgs) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001509 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001510 if (memcmp(dig_in, dig_vv, dgs)) {
1511 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1512 return 0;
1513 }
1514 }
1515
1516 D_ASSERT(data_size == 0);
1517 return 1;
1518}
1519
1520/* e_end_resync_block() is called via
1521 * drbd_process_done_ee() by asender only */
1522static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1523{
1524 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1525 sector_t sector = e->sector;
1526 int ok;
1527
1528 D_ASSERT(hlist_unhashed(&e->colision));
1529
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001530 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001531 drbd_set_in_sync(mdev, sector, e->size);
1532 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1533 } else {
1534 /* Record failure to sync */
1535 drbd_rs_failed_io(mdev, sector, e->size);
1536
1537 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1538 }
1539 dec_unacked(mdev);
1540
1541 return ok;
1542}
1543
1544static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1545{
1546 struct drbd_epoch_entry *e;
1547
1548 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001549 if (!e)
1550 goto fail;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001551
1552 dec_rs_pending(mdev);
1553
Philipp Reisnerb411b362009-09-25 16:07:19 -07001554 inc_unacked(mdev);
1555 /* corresponding dec_unacked() in e_end_resync_block()
1556 * respective _drbd_clear_done_ee */
1557
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001558 e->w.cb = e_end_resync_block;
1559
Philipp Reisnerb411b362009-09-25 16:07:19 -07001560 spin_lock_irq(&mdev->req_lock);
1561 list_add(&e->w.list, &mdev->sync_ee);
1562 spin_unlock_irq(&mdev->req_lock);
1563
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001564 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1565 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001566
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001567 drbd_free_ee(mdev, e);
1568fail:
1569 put_ldev(mdev);
1570 return FALSE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001571}
1572
1573static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1574{
1575 struct drbd_request *req;
1576 sector_t sector;
1577 unsigned int header_size, data_size;
1578 int ok;
1579 struct p_data *p = (struct p_data *)h;
1580
1581 header_size = sizeof(*p) - sizeof(*h);
1582 data_size = h->length - header_size;
1583
1584 ERR_IF(data_size == 0) return FALSE;
1585
1586 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1587 return FALSE;
1588
1589 sector = be64_to_cpu(p->sector);
1590
1591 spin_lock_irq(&mdev->req_lock);
1592 req = _ar_id_to_req(mdev, p->block_id, sector);
1593 spin_unlock_irq(&mdev->req_lock);
1594 if (unlikely(!req)) {
1595 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1596 return FALSE;
1597 }
1598
1599 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1600 * special casing it there for the various failure cases.
1601 * still no race with drbd_fail_pending_reads */
1602 ok = recv_dless_read(mdev, req, sector, data_size);
1603
1604 if (ok)
1605 req_mod(req, data_received);
1606 /* else: nothing. handled from drbd_disconnect...
1607 * I don't think we may complete this just yet
1608 * in case we are "on-disconnect: freeze" */
1609
1610 return ok;
1611}
1612
1613static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1614{
1615 sector_t sector;
1616 unsigned int header_size, data_size;
1617 int ok;
1618 struct p_data *p = (struct p_data *)h;
1619
1620 header_size = sizeof(*p) - sizeof(*h);
1621 data_size = h->length - header_size;
1622
1623 ERR_IF(data_size == 0) return FALSE;
1624
1625 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1626 return FALSE;
1627
1628 sector = be64_to_cpu(p->sector);
1629 D_ASSERT(p->block_id == ID_SYNCER);
1630
1631 if (get_ldev(mdev)) {
1632 /* data is submitted to disk within recv_resync_read.
1633 * corresponding put_ldev done below on error,
1634 * or in drbd_endio_write_sec. */
1635 ok = recv_resync_read(mdev, sector, data_size);
1636 } else {
1637 if (__ratelimit(&drbd_ratelimit_state))
1638 dev_err(DEV, "Can not write resync data to local disk.\n");
1639
1640 ok = drbd_drain_block(mdev, data_size);
1641
1642 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1643 }
1644
Philipp Reisner778f2712010-07-06 11:14:00 +02001645 atomic_add(data_size >> 9, &mdev->rs_sect_in);
1646
Philipp Reisnerb411b362009-09-25 16:07:19 -07001647 return ok;
1648}
1649
1650/* e_end_block() is called via drbd_process_done_ee().
1651 * this means this function only runs in the asender thread
1652 */
1653static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1654{
1655 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1656 sector_t sector = e->sector;
1657 struct drbd_epoch *epoch;
1658 int ok = 1, pcmd;
1659
1660 if (e->flags & EE_IS_BARRIER) {
1661 epoch = previous_epoch(mdev, e->epoch);
1662 if (epoch)
1663 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1664 }
1665
1666 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001667 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001668 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1669 mdev->state.conn <= C_PAUSED_SYNC_T &&
1670 e->flags & EE_MAY_SET_IN_SYNC) ?
1671 P_RS_WRITE_ACK : P_WRITE_ACK;
1672 ok &= drbd_send_ack(mdev, pcmd, e);
1673 if (pcmd == P_RS_WRITE_ACK)
1674 drbd_set_in_sync(mdev, sector, e->size);
1675 } else {
1676 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1677 /* we expect it to be marked out of sync anyways...
1678 * maybe assert this? */
1679 }
1680 dec_unacked(mdev);
1681 }
1682 /* we delete from the conflict detection hash _after_ we sent out the
1683 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1684 if (mdev->net_conf->two_primaries) {
1685 spin_lock_irq(&mdev->req_lock);
1686 D_ASSERT(!hlist_unhashed(&e->colision));
1687 hlist_del_init(&e->colision);
1688 spin_unlock_irq(&mdev->req_lock);
1689 } else {
1690 D_ASSERT(hlist_unhashed(&e->colision));
1691 }
1692
1693 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1694
1695 return ok;
1696}
1697
1698static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1699{
1700 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1701 int ok = 1;
1702
1703 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1704 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1705
1706 spin_lock_irq(&mdev->req_lock);
1707 D_ASSERT(!hlist_unhashed(&e->colision));
1708 hlist_del_init(&e->colision);
1709 spin_unlock_irq(&mdev->req_lock);
1710
1711 dec_unacked(mdev);
1712
1713 return ok;
1714}
1715
1716/* Called from receive_Data.
1717 * Synchronize packets on sock with packets on msock.
1718 *
1719 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1720 * packet traveling on msock, they are still processed in the order they have
1721 * been sent.
1722 *
1723 * Note: we don't care for Ack packets overtaking P_DATA packets.
1724 *
1725 * In case packet_seq is larger than mdev->peer_seq number, there are
1726 * outstanding packets on the msock. We wait for them to arrive.
1727 * In case we are the logically next packet, we update mdev->peer_seq
1728 * ourselves. Correctly handles 32bit wrap around.
1729 *
1730 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1731 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1732 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1733 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1734 *
1735 * returns 0 if we may process the packet,
1736 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1737static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1738{
1739 DEFINE_WAIT(wait);
1740 unsigned int p_seq;
1741 long timeout;
1742 int ret = 0;
1743 spin_lock(&mdev->peer_seq_lock);
1744 for (;;) {
1745 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1746 if (seq_le(packet_seq, mdev->peer_seq+1))
1747 break;
1748 if (signal_pending(current)) {
1749 ret = -ERESTARTSYS;
1750 break;
1751 }
1752 p_seq = mdev->peer_seq;
1753 spin_unlock(&mdev->peer_seq_lock);
1754 timeout = schedule_timeout(30*HZ);
1755 spin_lock(&mdev->peer_seq_lock);
1756 if (timeout == 0 && p_seq == mdev->peer_seq) {
1757 ret = -ETIMEDOUT;
1758 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1759 break;
1760 }
1761 }
1762 finish_wait(&mdev->seq_wait, &wait);
1763 if (mdev->peer_seq+1 == packet_seq)
1764 mdev->peer_seq++;
1765 spin_unlock(&mdev->peer_seq_lock);
1766 return ret;
1767}
1768
1769/* mirrored write */
1770static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1771{
1772 sector_t sector;
1773 struct drbd_epoch_entry *e;
1774 struct p_data *p = (struct p_data *)h;
1775 int header_size, data_size;
1776 int rw = WRITE;
1777 u32 dp_flags;
1778
1779 header_size = sizeof(*p) - sizeof(*h);
1780 data_size = h->length - header_size;
1781
1782 ERR_IF(data_size == 0) return FALSE;
1783
1784 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1785 return FALSE;
1786
1787 if (!get_ldev(mdev)) {
1788 if (__ratelimit(&drbd_ratelimit_state))
1789 dev_err(DEV, "Can not write mirrored data block "
1790 "to local disk.\n");
1791 spin_lock(&mdev->peer_seq_lock);
1792 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1793 mdev->peer_seq++;
1794 spin_unlock(&mdev->peer_seq_lock);
1795
1796 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1797 atomic_inc(&mdev->current_epoch->epoch_size);
1798 return drbd_drain_block(mdev, data_size);
1799 }
1800
1801 /* get_ldev(mdev) successful.
1802 * Corresponding put_ldev done either below (on various errors),
1803 * or in drbd_endio_write_sec, if we successfully submit the data at
1804 * the end of this function. */
1805
1806 sector = be64_to_cpu(p->sector);
1807 e = read_in_block(mdev, p->block_id, sector, data_size);
1808 if (!e) {
1809 put_ldev(mdev);
1810 return FALSE;
1811 }
1812
Philipp Reisnerb411b362009-09-25 16:07:19 -07001813 e->w.cb = e_end_block;
1814
1815 spin_lock(&mdev->epoch_lock);
1816 e->epoch = mdev->current_epoch;
1817 atomic_inc(&e->epoch->epoch_size);
1818 atomic_inc(&e->epoch->active);
1819
1820 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1821 struct drbd_epoch *epoch;
1822 /* Issue a barrier if we start a new epoch, and the previous epoch
1823 was not a epoch containing a single request which already was
1824 a Barrier. */
1825 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1826 if (epoch == e->epoch) {
1827 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001828 rw |= REQ_HARDBARRIER;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001829 e->flags |= EE_IS_BARRIER;
1830 } else {
1831 if (atomic_read(&epoch->epoch_size) > 1 ||
1832 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1833 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001834 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001835 rw |= REQ_HARDBARRIER;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001836 e->flags |= EE_IS_BARRIER;
1837 }
1838 }
1839 }
1840 spin_unlock(&mdev->epoch_lock);
1841
1842 dp_flags = be32_to_cpu(p->dp_flags);
1843 if (dp_flags & DP_HARDBARRIER) {
1844 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001845 /* rw |= REQ_HARDBARRIER; */
Philipp Reisnerb411b362009-09-25 16:07:19 -07001846 }
1847 if (dp_flags & DP_RW_SYNC)
Christoph Hellwig7b6d91d2010-08-07 18:20:39 +02001848 rw |= REQ_SYNC | REQ_UNPLUG;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001849 if (dp_flags & DP_MAY_SET_IN_SYNC)
1850 e->flags |= EE_MAY_SET_IN_SYNC;
1851
1852 /* I'm the receiver, I do hold a net_cnt reference. */
1853 if (!mdev->net_conf->two_primaries) {
1854 spin_lock_irq(&mdev->req_lock);
1855 } else {
1856 /* don't get the req_lock yet,
1857 * we may sleep in drbd_wait_peer_seq */
1858 const int size = e->size;
1859 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1860 DEFINE_WAIT(wait);
1861 struct drbd_request *i;
1862 struct hlist_node *n;
1863 struct hlist_head *slot;
1864 int first;
1865
1866 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1867 BUG_ON(mdev->ee_hash == NULL);
1868 BUG_ON(mdev->tl_hash == NULL);
1869
1870 /* conflict detection and handling:
1871 * 1. wait on the sequence number,
1872 * in case this data packet overtook ACK packets.
1873 * 2. check our hash tables for conflicting requests.
1874 * we only need to walk the tl_hash, since an ee can not
1875 * have a conflict with an other ee: on the submitting
1876 * node, the corresponding req had already been conflicting,
1877 * and a conflicting req is never sent.
1878 *
1879 * Note: for two_primaries, we are protocol C,
1880 * so there cannot be any request that is DONE
1881 * but still on the transfer log.
1882 *
1883 * unconditionally add to the ee_hash.
1884 *
1885 * if no conflicting request is found:
1886 * submit.
1887 *
1888 * if any conflicting request is found
1889 * that has not yet been acked,
1890 * AND I have the "discard concurrent writes" flag:
1891 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1892 *
1893 * if any conflicting request is found:
1894 * block the receiver, waiting on misc_wait
1895 * until no more conflicting requests are there,
1896 * or we get interrupted (disconnect).
1897 *
1898 * we do not just write after local io completion of those
1899 * requests, but only after req is done completely, i.e.
1900 * we wait for the P_DISCARD_ACK to arrive!
1901 *
1902 * then proceed normally, i.e. submit.
1903 */
1904 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1905 goto out_interrupted;
1906
1907 spin_lock_irq(&mdev->req_lock);
1908
1909 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1910
1911#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1912 slot = tl_hash_slot(mdev, sector);
1913 first = 1;
1914 for (;;) {
1915 int have_unacked = 0;
1916 int have_conflict = 0;
1917 prepare_to_wait(&mdev->misc_wait, &wait,
1918 TASK_INTERRUPTIBLE);
1919 hlist_for_each_entry(i, n, slot, colision) {
1920 if (OVERLAPS) {
1921 /* only ALERT on first iteration,
1922 * we may be woken up early... */
1923 if (first)
1924 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1925 " new: %llus +%u; pending: %llus +%u\n",
1926 current->comm, current->pid,
1927 (unsigned long long)sector, size,
1928 (unsigned long long)i->sector, i->size);
1929 if (i->rq_state & RQ_NET_PENDING)
1930 ++have_unacked;
1931 ++have_conflict;
1932 }
1933 }
1934#undef OVERLAPS
1935 if (!have_conflict)
1936 break;
1937
1938 /* Discard Ack only for the _first_ iteration */
1939 if (first && discard && have_unacked) {
1940 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1941 (unsigned long long)sector);
1942 inc_unacked(mdev);
1943 e->w.cb = e_send_discard_ack;
1944 list_add_tail(&e->w.list, &mdev->done_ee);
1945
1946 spin_unlock_irq(&mdev->req_lock);
1947
1948 /* we could probably send that P_DISCARD_ACK ourselves,
1949 * but I don't like the receiver using the msock */
1950
1951 put_ldev(mdev);
1952 wake_asender(mdev);
1953 finish_wait(&mdev->misc_wait, &wait);
1954 return TRUE;
1955 }
1956
1957 if (signal_pending(current)) {
1958 hlist_del_init(&e->colision);
1959
1960 spin_unlock_irq(&mdev->req_lock);
1961
1962 finish_wait(&mdev->misc_wait, &wait);
1963 goto out_interrupted;
1964 }
1965
1966 spin_unlock_irq(&mdev->req_lock);
1967 if (first) {
1968 first = 0;
1969 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1970 "sec=%llus\n", (unsigned long long)sector);
1971 } else if (discard) {
1972 /* we had none on the first iteration.
1973 * there must be none now. */
1974 D_ASSERT(have_unacked == 0);
1975 }
1976 schedule();
1977 spin_lock_irq(&mdev->req_lock);
1978 }
1979 finish_wait(&mdev->misc_wait, &wait);
1980 }
1981
1982 list_add(&e->w.list, &mdev->active_ee);
1983 spin_unlock_irq(&mdev->req_lock);
1984
1985 switch (mdev->net_conf->wire_protocol) {
1986 case DRBD_PROT_C:
1987 inc_unacked(mdev);
1988 /* corresponding dec_unacked() in e_end_block()
1989 * respective _drbd_clear_done_ee */
1990 break;
1991 case DRBD_PROT_B:
1992 /* I really don't like it that the receiver thread
1993 * sends on the msock, but anyways */
1994 drbd_send_ack(mdev, P_RECV_ACK, e);
1995 break;
1996 case DRBD_PROT_A:
1997 /* nothing to do */
1998 break;
1999 }
2000
2001 if (mdev->state.pdsk == D_DISKLESS) {
2002 /* In case we have the only disk of the cluster, */
2003 drbd_set_out_of_sync(mdev, e->sector, e->size);
2004 e->flags |= EE_CALL_AL_COMPLETE_IO;
2005 drbd_al_begin_io(mdev, e->sector);
2006 }
2007
Lars Ellenberg45bb9122010-05-14 17:10:48 +02002008 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2009 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002010
2011out_interrupted:
2012 /* yes, the epoch_size now is imbalanced.
2013 * but we drop the connection anyways, so we don't have a chance to
2014 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2015 put_ldev(mdev);
2016 drbd_free_ee(mdev, e);
2017 return FALSE;
2018}
2019
2020static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2021{
2022 sector_t sector;
2023 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2024 struct drbd_epoch_entry *e;
2025 struct digest_info *di = NULL;
2026 int size, digest_size;
2027 unsigned int fault_type;
2028 struct p_block_req *p =
2029 (struct p_block_req *)h;
2030 const int brps = sizeof(*p)-sizeof(*h);
2031
2032 if (drbd_recv(mdev, h->payload, brps) != brps)
2033 return FALSE;
2034
2035 sector = be64_to_cpu(p->sector);
2036 size = be32_to_cpu(p->blksize);
2037
2038 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2039 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2040 (unsigned long long)sector, size);
2041 return FALSE;
2042 }
2043 if (sector + (size>>9) > capacity) {
2044 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2045 (unsigned long long)sector, size);
2046 return FALSE;
2047 }
2048
2049 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2050 if (__ratelimit(&drbd_ratelimit_state))
2051 dev_err(DEV, "Can not satisfy peer's read request, "
2052 "no local data.\n");
2053 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2054 P_NEG_RS_DREPLY , p);
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02002055 return drbd_drain_block(mdev, h->length - brps);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002056 }
2057
2058 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2059 * "criss-cross" setup, that might cause write-out on some other DRBD,
2060 * which in turn might block on the other node at this very place. */
2061 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2062 if (!e) {
2063 put_ldev(mdev);
2064 return FALSE;
2065 }
2066
Philipp Reisnerb411b362009-09-25 16:07:19 -07002067 switch (h->command) {
2068 case P_DATA_REQUEST:
2069 e->w.cb = w_e_end_data_req;
2070 fault_type = DRBD_FAULT_DT_RD;
2071 break;
2072 case P_RS_DATA_REQUEST:
2073 e->w.cb = w_e_end_rsdata_req;
2074 fault_type = DRBD_FAULT_RS_RD;
2075 /* Eventually this should become asynchronously. Currently it
2076 * blocks the whole receiver just to delay the reading of a
2077 * resync data block.
2078 * the drbd_work_queue mechanism is made for this...
2079 */
2080 if (!drbd_rs_begin_io(mdev, sector)) {
2081 /* we have been interrupted,
2082 * probably connection lost! */
2083 D_ASSERT(signal_pending(current));
2084 goto out_free_e;
2085 }
2086 break;
2087
2088 case P_OV_REPLY:
2089 case P_CSUM_RS_REQUEST:
2090 fault_type = DRBD_FAULT_RS_RD;
2091 digest_size = h->length - brps ;
2092 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2093 if (!di)
2094 goto out_free_e;
2095
2096 di->digest_size = digest_size;
2097 di->digest = (((char *)di)+sizeof(struct digest_info));
2098
Lars Ellenbergc36c3ce2010-08-11 20:42:55 +02002099 e->digest = di;
2100 e->flags |= EE_HAS_DIGEST;
2101
Philipp Reisnerb411b362009-09-25 16:07:19 -07002102 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2103 goto out_free_e;
2104
Philipp Reisnerb411b362009-09-25 16:07:19 -07002105 if (h->command == P_CSUM_RS_REQUEST) {
2106 D_ASSERT(mdev->agreed_pro_version >= 89);
2107 e->w.cb = w_e_end_csum_rs_req;
2108 } else if (h->command == P_OV_REPLY) {
2109 e->w.cb = w_e_end_ov_reply;
2110 dec_rs_pending(mdev);
2111 break;
2112 }
2113
2114 if (!drbd_rs_begin_io(mdev, sector)) {
2115 /* we have been interrupted, probably connection lost! */
2116 D_ASSERT(signal_pending(current));
2117 goto out_free_e;
2118 }
2119 break;
2120
2121 case P_OV_REQUEST:
2122 if (mdev->state.conn >= C_CONNECTED &&
2123 mdev->state.conn != C_VERIFY_T)
2124 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2125 drbd_conn_str(mdev->state.conn));
2126 if (mdev->ov_start_sector == ~(sector_t)0 &&
2127 mdev->agreed_pro_version >= 90) {
2128 mdev->ov_start_sector = sector;
2129 mdev->ov_position = sector;
2130 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2131 dev_info(DEV, "Online Verify start sector: %llu\n",
2132 (unsigned long long)sector);
2133 }
2134 e->w.cb = w_e_end_ov_req;
2135 fault_type = DRBD_FAULT_RS_RD;
2136 /* Eventually this should become asynchronous. Currently it
2137 * blocks the whole receiver just to delay the reading of a
2138 * resync data block.
2139 * the drbd_work_queue mechanism is made for this...
2140 */
2141 if (!drbd_rs_begin_io(mdev, sector)) {
2142 /* we have been interrupted,
2143 * probably connection lost! */
2144 D_ASSERT(signal_pending(current));
2145 goto out_free_e;
2146 }
2147 break;
2148
2149
2150 default:
2151 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2152 cmdname(h->command));
2153 fault_type = DRBD_FAULT_MAX;
2154 }
2155
2156 spin_lock_irq(&mdev->req_lock);
2157 list_add(&e->w.list, &mdev->read_ee);
2158 spin_unlock_irq(&mdev->req_lock);
2159
2160 inc_unacked(mdev);
2161
Lars Ellenberg45bb9122010-05-14 17:10:48 +02002162 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2163 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002164
2165out_free_e:
Philipp Reisnerb411b362009-09-25 16:07:19 -07002166 put_ldev(mdev);
2167 drbd_free_ee(mdev, e);
2168 return FALSE;
2169}
2170
2171static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2172{
2173 int self, peer, rv = -100;
2174 unsigned long ch_self, ch_peer;
2175
2176 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2177 peer = mdev->p_uuid[UI_BITMAP] & 1;
2178
2179 ch_peer = mdev->p_uuid[UI_SIZE];
2180 ch_self = mdev->comm_bm_set;
2181
2182 switch (mdev->net_conf->after_sb_0p) {
2183 case ASB_CONSENSUS:
2184 case ASB_DISCARD_SECONDARY:
2185 case ASB_CALL_HELPER:
2186 dev_err(DEV, "Configuration error.\n");
2187 break;
2188 case ASB_DISCONNECT:
2189 break;
2190 case ASB_DISCARD_YOUNGER_PRI:
2191 if (self == 0 && peer == 1) {
2192 rv = -1;
2193 break;
2194 }
2195 if (self == 1 && peer == 0) {
2196 rv = 1;
2197 break;
2198 }
2199 /* Else fall through to one of the other strategies... */
2200 case ASB_DISCARD_OLDER_PRI:
2201 if (self == 0 && peer == 1) {
2202 rv = 1;
2203 break;
2204 }
2205 if (self == 1 && peer == 0) {
2206 rv = -1;
2207 break;
2208 }
2209 /* Else fall through to one of the other strategies... */
Lars Ellenbergad19bf62009-10-14 09:36:49 +02002210 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
Philipp Reisnerb411b362009-09-25 16:07:19 -07002211 "Using discard-least-changes instead\n");
2212 case ASB_DISCARD_ZERO_CHG:
2213 if (ch_peer == 0 && ch_self == 0) {
2214 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2215 ? -1 : 1;
2216 break;
2217 } else {
2218 if (ch_peer == 0) { rv = 1; break; }
2219 if (ch_self == 0) { rv = -1; break; }
2220 }
2221 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2222 break;
2223 case ASB_DISCARD_LEAST_CHG:
2224 if (ch_self < ch_peer)
2225 rv = -1;
2226 else if (ch_self > ch_peer)
2227 rv = 1;
2228 else /* ( ch_self == ch_peer ) */
2229 /* Well, then use something else. */
2230 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2231 ? -1 : 1;
2232 break;
2233 case ASB_DISCARD_LOCAL:
2234 rv = -1;
2235 break;
2236 case ASB_DISCARD_REMOTE:
2237 rv = 1;
2238 }
2239
2240 return rv;
2241}
2242
2243static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2244{
2245 int self, peer, hg, rv = -100;
2246
2247 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2248 peer = mdev->p_uuid[UI_BITMAP] & 1;
2249
2250 switch (mdev->net_conf->after_sb_1p) {
2251 case ASB_DISCARD_YOUNGER_PRI:
2252 case ASB_DISCARD_OLDER_PRI:
2253 case ASB_DISCARD_LEAST_CHG:
2254 case ASB_DISCARD_LOCAL:
2255 case ASB_DISCARD_REMOTE:
2256 dev_err(DEV, "Configuration error.\n");
2257 break;
2258 case ASB_DISCONNECT:
2259 break;
2260 case ASB_CONSENSUS:
2261 hg = drbd_asb_recover_0p(mdev);
2262 if (hg == -1 && mdev->state.role == R_SECONDARY)
2263 rv = hg;
2264 if (hg == 1 && mdev->state.role == R_PRIMARY)
2265 rv = hg;
2266 break;
2267 case ASB_VIOLENTLY:
2268 rv = drbd_asb_recover_0p(mdev);
2269 break;
2270 case ASB_DISCARD_SECONDARY:
2271 return mdev->state.role == R_PRIMARY ? 1 : -1;
2272 case ASB_CALL_HELPER:
2273 hg = drbd_asb_recover_0p(mdev);
2274 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2275 self = drbd_set_role(mdev, R_SECONDARY, 0);
2276 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2277 * we might be here in C_WF_REPORT_PARAMS which is transient.
2278 * we do not need to wait for the after state change work either. */
2279 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2280 if (self != SS_SUCCESS) {
2281 drbd_khelper(mdev, "pri-lost-after-sb");
2282 } else {
2283 dev_warn(DEV, "Successfully gave up primary role.\n");
2284 rv = hg;
2285 }
2286 } else
2287 rv = hg;
2288 }
2289
2290 return rv;
2291}
2292
2293static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2294{
2295 int self, peer, hg, rv = -100;
2296
2297 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2298 peer = mdev->p_uuid[UI_BITMAP] & 1;
2299
2300 switch (mdev->net_conf->after_sb_2p) {
2301 case ASB_DISCARD_YOUNGER_PRI:
2302 case ASB_DISCARD_OLDER_PRI:
2303 case ASB_DISCARD_LEAST_CHG:
2304 case ASB_DISCARD_LOCAL:
2305 case ASB_DISCARD_REMOTE:
2306 case ASB_CONSENSUS:
2307 case ASB_DISCARD_SECONDARY:
2308 dev_err(DEV, "Configuration error.\n");
2309 break;
2310 case ASB_VIOLENTLY:
2311 rv = drbd_asb_recover_0p(mdev);
2312 break;
2313 case ASB_DISCONNECT:
2314 break;
2315 case ASB_CALL_HELPER:
2316 hg = drbd_asb_recover_0p(mdev);
2317 if (hg == -1) {
2318 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2319 * we might be here in C_WF_REPORT_PARAMS which is transient.
2320 * we do not need to wait for the after state change work either. */
2321 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2322 if (self != SS_SUCCESS) {
2323 drbd_khelper(mdev, "pri-lost-after-sb");
2324 } else {
2325 dev_warn(DEV, "Successfully gave up primary role.\n");
2326 rv = hg;
2327 }
2328 } else
2329 rv = hg;
2330 }
2331
2332 return rv;
2333}
2334
2335static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2336 u64 bits, u64 flags)
2337{
2338 if (!uuid) {
2339 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2340 return;
2341 }
2342 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2343 text,
2344 (unsigned long long)uuid[UI_CURRENT],
2345 (unsigned long long)uuid[UI_BITMAP],
2346 (unsigned long long)uuid[UI_HISTORY_START],
2347 (unsigned long long)uuid[UI_HISTORY_END],
2348 (unsigned long long)bits,
2349 (unsigned long long)flags);
2350}
2351
2352/*
2353 100 after split brain try auto recover
2354 2 C_SYNC_SOURCE set BitMap
2355 1 C_SYNC_SOURCE use BitMap
2356 0 no Sync
2357 -1 C_SYNC_TARGET use BitMap
2358 -2 C_SYNC_TARGET set BitMap
2359 -100 after split brain, disconnect
2360-1000 unrelated data
2361 */
2362static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2363{
2364 u64 self, peer;
2365 int i, j;
2366
2367 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2368 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2369
2370 *rule_nr = 10;
2371 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2372 return 0;
2373
2374 *rule_nr = 20;
2375 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2376 peer != UUID_JUST_CREATED)
2377 return -2;
2378
2379 *rule_nr = 30;
2380 if (self != UUID_JUST_CREATED &&
2381 (peer == UUID_JUST_CREATED || peer == (u64)0))
2382 return 2;
2383
2384 if (self == peer) {
2385 int rct, dc; /* roles at crash time */
2386
2387 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2388
2389 if (mdev->agreed_pro_version < 91)
2390 return -1001;
2391
2392 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2393 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2394 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2395 drbd_uuid_set_bm(mdev, 0UL);
2396
2397 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2398 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2399 *rule_nr = 34;
2400 } else {
2401 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2402 *rule_nr = 36;
2403 }
2404
2405 return 1;
2406 }
2407
2408 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2409
2410 if (mdev->agreed_pro_version < 91)
2411 return -1001;
2412
2413 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2414 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2415 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2416
2417 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2418 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2419 mdev->p_uuid[UI_BITMAP] = 0UL;
2420
2421 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2422 *rule_nr = 35;
2423 } else {
2424 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2425 *rule_nr = 37;
2426 }
2427
2428 return -1;
2429 }
2430
2431 /* Common power [off|failure] */
2432 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2433 (mdev->p_uuid[UI_FLAGS] & 2);
2434 /* lowest bit is set when we were primary,
2435 * next bit (weight 2) is set when peer was primary */
2436 *rule_nr = 40;
2437
2438 switch (rct) {
2439 case 0: /* !self_pri && !peer_pri */ return 0;
2440 case 1: /* self_pri && !peer_pri */ return 1;
2441 case 2: /* !self_pri && peer_pri */ return -1;
2442 case 3: /* self_pri && peer_pri */
2443 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2444 return dc ? -1 : 1;
2445 }
2446 }
2447
2448 *rule_nr = 50;
2449 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2450 if (self == peer)
2451 return -1;
2452
2453 *rule_nr = 51;
2454 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2455 if (self == peer) {
2456 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2457 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2458 if (self == peer) {
2459 /* The last P_SYNC_UUID did not get though. Undo the last start of
2460 resync as sync source modifications of the peer's UUIDs. */
2461
2462 if (mdev->agreed_pro_version < 91)
2463 return -1001;
2464
2465 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2466 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2467 return -1;
2468 }
2469 }
2470
2471 *rule_nr = 60;
2472 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2473 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2474 peer = mdev->p_uuid[i] & ~((u64)1);
2475 if (self == peer)
2476 return -2;
2477 }
2478
2479 *rule_nr = 70;
2480 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2481 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2482 if (self == peer)
2483 return 1;
2484
2485 *rule_nr = 71;
2486 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2487 if (self == peer) {
2488 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2489 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2490 if (self == peer) {
2491 /* The last P_SYNC_UUID did not get though. Undo the last start of
2492 resync as sync source modifications of our UUIDs. */
2493
2494 if (mdev->agreed_pro_version < 91)
2495 return -1001;
2496
2497 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2498 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2499
2500 dev_info(DEV, "Undid last start of resync:\n");
2501
2502 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2503 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2504
2505 return 1;
2506 }
2507 }
2508
2509
2510 *rule_nr = 80;
Philipp Reisnerd8c2a362009-11-18 15:52:51 +01002511 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002512 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2513 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2514 if (self == peer)
2515 return 2;
2516 }
2517
2518 *rule_nr = 90;
2519 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2520 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2521 if (self == peer && self != ((u64)0))
2522 return 100;
2523
2524 *rule_nr = 100;
2525 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2526 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2527 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2528 peer = mdev->p_uuid[j] & ~((u64)1);
2529 if (self == peer)
2530 return -100;
2531 }
2532 }
2533
2534 return -1000;
2535}
2536
2537/* drbd_sync_handshake() returns the new conn state on success, or
2538 CONN_MASK (-1) on failure.
2539 */
2540static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2541 enum drbd_disk_state peer_disk) __must_hold(local)
2542{
2543 int hg, rule_nr;
2544 enum drbd_conns rv = C_MASK;
2545 enum drbd_disk_state mydisk;
2546
2547 mydisk = mdev->state.disk;
2548 if (mydisk == D_NEGOTIATING)
2549 mydisk = mdev->new_state_tmp.disk;
2550
2551 dev_info(DEV, "drbd_sync_handshake:\n");
2552 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2553 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2554 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2555
2556 hg = drbd_uuid_compare(mdev, &rule_nr);
2557
2558 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2559
2560 if (hg == -1000) {
2561 dev_alert(DEV, "Unrelated data, aborting!\n");
2562 return C_MASK;
2563 }
2564 if (hg == -1001) {
2565 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2566 return C_MASK;
2567 }
2568
2569 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2570 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2571 int f = (hg == -100) || abs(hg) == 2;
2572 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2573 if (f)
2574 hg = hg*2;
2575 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2576 hg > 0 ? "source" : "target");
2577 }
2578
Adam Gandelman3a11a482010-04-08 16:48:23 -07002579 if (abs(hg) == 100)
2580 drbd_khelper(mdev, "initial-split-brain");
2581
Philipp Reisnerb411b362009-09-25 16:07:19 -07002582 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2583 int pcount = (mdev->state.role == R_PRIMARY)
2584 + (peer_role == R_PRIMARY);
2585 int forced = (hg == -100);
2586
2587 switch (pcount) {
2588 case 0:
2589 hg = drbd_asb_recover_0p(mdev);
2590 break;
2591 case 1:
2592 hg = drbd_asb_recover_1p(mdev);
2593 break;
2594 case 2:
2595 hg = drbd_asb_recover_2p(mdev);
2596 break;
2597 }
2598 if (abs(hg) < 100) {
2599 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2600 "automatically solved. Sync from %s node\n",
2601 pcount, (hg < 0) ? "peer" : "this");
2602 if (forced) {
2603 dev_warn(DEV, "Doing a full sync, since"
2604 " UUIDs where ambiguous.\n");
2605 hg = hg*2;
2606 }
2607 }
2608 }
2609
2610 if (hg == -100) {
2611 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2612 hg = -1;
2613 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2614 hg = 1;
2615
2616 if (abs(hg) < 100)
2617 dev_warn(DEV, "Split-Brain detected, manually solved. "
2618 "Sync from %s node\n",
2619 (hg < 0) ? "peer" : "this");
2620 }
2621
2622 if (hg == -100) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01002623 /* FIXME this log message is not correct if we end up here
2624 * after an attempted attach on a diskless node.
2625 * We just refuse to attach -- well, we drop the "connection"
2626 * to that disk, in a way... */
Adam Gandelman3a11a482010-04-08 16:48:23 -07002627 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
Philipp Reisnerb411b362009-09-25 16:07:19 -07002628 drbd_khelper(mdev, "split-brain");
2629 return C_MASK;
2630 }
2631
2632 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2633 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2634 return C_MASK;
2635 }
2636
2637 if (hg < 0 && /* by intention we do not use mydisk here. */
2638 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2639 switch (mdev->net_conf->rr_conflict) {
2640 case ASB_CALL_HELPER:
2641 drbd_khelper(mdev, "pri-lost");
2642 /* fall through */
2643 case ASB_DISCONNECT:
2644 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2645 return C_MASK;
2646 case ASB_VIOLENTLY:
2647 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2648 "assumption\n");
2649 }
2650 }
2651
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002652 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2653 if (hg == 0)
2654 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2655 else
2656 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2657 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2658 abs(hg) >= 2 ? "full" : "bit-map based");
2659 return C_MASK;
2660 }
2661
Philipp Reisnerb411b362009-09-25 16:07:19 -07002662 if (abs(hg) >= 2) {
2663 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2664 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2665 return C_MASK;
2666 }
2667
2668 if (hg > 0) { /* become sync source. */
2669 rv = C_WF_BITMAP_S;
2670 } else if (hg < 0) { /* become sync target */
2671 rv = C_WF_BITMAP_T;
2672 } else {
2673 rv = C_CONNECTED;
2674 if (drbd_bm_total_weight(mdev)) {
2675 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2676 drbd_bm_total_weight(mdev));
2677 }
2678 }
2679
2680 return rv;
2681}
2682
2683/* returns 1 if invalid */
2684static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2685{
2686 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2687 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2688 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2689 return 0;
2690
2691 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2692 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2693 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2694 return 1;
2695
2696 /* everything else is valid if they are equal on both sides. */
2697 if (peer == self)
2698 return 0;
2699
2700 /* everything es is invalid. */
2701 return 1;
2702}
2703
2704static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2705{
2706 struct p_protocol *p = (struct p_protocol *)h;
2707 int header_size, data_size;
2708 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002709 int p_want_lose, p_two_primaries, cf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002710 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2711
2712 header_size = sizeof(*p) - sizeof(*h);
2713 data_size = h->length - header_size;
2714
2715 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2716 return FALSE;
2717
2718 p_proto = be32_to_cpu(p->protocol);
2719 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2720 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2721 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002722 p_two_primaries = be32_to_cpu(p->two_primaries);
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002723 cf = be32_to_cpu(p->conn_flags);
2724 p_want_lose = cf & CF_WANT_LOSE;
2725
2726 clear_bit(CONN_DRY_RUN, &mdev->flags);
2727
2728 if (cf & CF_DRY_RUN)
2729 set_bit(CONN_DRY_RUN, &mdev->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002730
2731 if (p_proto != mdev->net_conf->wire_protocol) {
2732 dev_err(DEV, "incompatible communication protocols\n");
2733 goto disconnect;
2734 }
2735
2736 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2737 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2738 goto disconnect;
2739 }
2740
2741 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2742 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2743 goto disconnect;
2744 }
2745
2746 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2747 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2748 goto disconnect;
2749 }
2750
2751 if (p_want_lose && mdev->net_conf->want_lose) {
2752 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2753 goto disconnect;
2754 }
2755
2756 if (p_two_primaries != mdev->net_conf->two_primaries) {
2757 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2758 goto disconnect;
2759 }
2760
2761 if (mdev->agreed_pro_version >= 87) {
2762 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2763
2764 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2765 return FALSE;
2766
2767 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2768 if (strcmp(p_integrity_alg, my_alg)) {
2769 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2770 goto disconnect;
2771 }
2772 dev_info(DEV, "data-integrity-alg: %s\n",
2773 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2774 }
2775
2776 return TRUE;
2777
2778disconnect:
2779 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2780 return FALSE;
2781}
2782
2783/* helper function
2784 * input: alg name, feature name
2785 * return: NULL (alg name was "")
2786 * ERR_PTR(error) if something goes wrong
2787 * or the crypto hash ptr, if it worked out ok. */
2788struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2789 const char *alg, const char *name)
2790{
2791 struct crypto_hash *tfm;
2792
2793 if (!alg[0])
2794 return NULL;
2795
2796 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2797 if (IS_ERR(tfm)) {
2798 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2799 alg, name, PTR_ERR(tfm));
2800 return tfm;
2801 }
2802 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2803 crypto_free_hash(tfm);
2804 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2805 return ERR_PTR(-EINVAL);
2806 }
2807 return tfm;
2808}
2809
2810static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2811{
2812 int ok = TRUE;
Philipp Reisner8e26f9c2010-07-06 17:25:54 +02002813 struct p_rs_param_95 *p = (struct p_rs_param_95 *)h;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002814 unsigned int header_size, data_size, exp_max_sz;
2815 struct crypto_hash *verify_tfm = NULL;
2816 struct crypto_hash *csums_tfm = NULL;
2817 const int apv = mdev->agreed_pro_version;
Philipp Reisner778f2712010-07-06 11:14:00 +02002818 int *rs_plan_s = NULL;
2819 int fifo_size = 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002820
2821 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2822 : apv == 88 ? sizeof(struct p_rs_param)
2823 + SHARED_SECRET_MAX
Philipp Reisner8e26f9c2010-07-06 17:25:54 +02002824 : apv <= 94 ? sizeof(struct p_rs_param_89)
2825 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002826
2827 if (h->length > exp_max_sz) {
2828 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2829 h->length, exp_max_sz);
2830 return FALSE;
2831 }
2832
2833 if (apv <= 88) {
2834 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2835 data_size = h->length - header_size;
Philipp Reisner8e26f9c2010-07-06 17:25:54 +02002836 } else if (apv <= 94) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07002837 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2838 data_size = h->length - header_size;
2839 D_ASSERT(data_size == 0);
Philipp Reisner8e26f9c2010-07-06 17:25:54 +02002840 } else {
2841 header_size = sizeof(struct p_rs_param_95) - sizeof(*h);
2842 data_size = h->length - header_size;
2843 D_ASSERT(data_size == 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002844 }
2845
2846 /* initialize verify_alg and csums_alg */
2847 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2848
2849 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2850 return FALSE;
2851
2852 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2853
2854 if (apv >= 88) {
2855 if (apv == 88) {
2856 if (data_size > SHARED_SECRET_MAX) {
2857 dev_err(DEV, "verify-alg too long, "
2858 "peer wants %u, accepting only %u byte\n",
2859 data_size, SHARED_SECRET_MAX);
2860 return FALSE;
2861 }
2862
2863 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2864 return FALSE;
2865
2866 /* we expect NUL terminated string */
2867 /* but just in case someone tries to be evil */
2868 D_ASSERT(p->verify_alg[data_size-1] == 0);
2869 p->verify_alg[data_size-1] = 0;
2870
2871 } else /* apv >= 89 */ {
2872 /* we still expect NUL terminated strings */
2873 /* but just in case someone tries to be evil */
2874 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2875 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2876 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2877 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2878 }
2879
2880 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2881 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2882 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2883 mdev->sync_conf.verify_alg, p->verify_alg);
2884 goto disconnect;
2885 }
2886 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2887 p->verify_alg, "verify-alg");
2888 if (IS_ERR(verify_tfm)) {
2889 verify_tfm = NULL;
2890 goto disconnect;
2891 }
2892 }
2893
2894 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2895 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2896 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2897 mdev->sync_conf.csums_alg, p->csums_alg);
2898 goto disconnect;
2899 }
2900 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2901 p->csums_alg, "csums-alg");
2902 if (IS_ERR(csums_tfm)) {
2903 csums_tfm = NULL;
2904 goto disconnect;
2905 }
2906 }
2907
Philipp Reisner8e26f9c2010-07-06 17:25:54 +02002908 if (apv > 94) {
2909 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2910 mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2911 mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2912 mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2913 mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
Philipp Reisner778f2712010-07-06 11:14:00 +02002914
2915 fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2916 if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2917 rs_plan_s = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2918 if (!rs_plan_s) {
2919 dev_err(DEV, "kmalloc of fifo_buffer failed");
2920 goto disconnect;
2921 }
2922 }
Philipp Reisner8e26f9c2010-07-06 17:25:54 +02002923 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07002924
2925 spin_lock(&mdev->peer_seq_lock);
2926 /* lock against drbd_nl_syncer_conf() */
2927 if (verify_tfm) {
2928 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2929 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2930 crypto_free_hash(mdev->verify_tfm);
2931 mdev->verify_tfm = verify_tfm;
2932 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2933 }
2934 if (csums_tfm) {
2935 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2936 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2937 crypto_free_hash(mdev->csums_tfm);
2938 mdev->csums_tfm = csums_tfm;
2939 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2940 }
Philipp Reisner778f2712010-07-06 11:14:00 +02002941 if (fifo_size != mdev->rs_plan_s.size) {
2942 kfree(mdev->rs_plan_s.values);
2943 mdev->rs_plan_s.values = rs_plan_s;
2944 mdev->rs_plan_s.size = fifo_size;
2945 mdev->rs_planed = 0;
2946 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07002947 spin_unlock(&mdev->peer_seq_lock);
2948 }
2949
2950 return ok;
2951disconnect:
2952 /* just for completeness: actually not needed,
2953 * as this is not reached if csums_tfm was ok. */
2954 crypto_free_hash(csums_tfm);
2955 /* but free the verify_tfm again, if csums_tfm did not work out */
2956 crypto_free_hash(verify_tfm);
2957 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2958 return FALSE;
2959}
2960
2961static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2962{
2963 /* sorry, we currently have no working implementation
2964 * of distributed TCQ */
2965}
2966
2967/* warn if the arguments differ by more than 12.5% */
2968static void warn_if_differ_considerably(struct drbd_conf *mdev,
2969 const char *s, sector_t a, sector_t b)
2970{
2971 sector_t d;
2972 if (a == 0 || b == 0)
2973 return;
2974 d = (a > b) ? (a - b) : (b - a);
2975 if (d > (a>>3) || d > (b>>3))
2976 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2977 (unsigned long long)a, (unsigned long long)b);
2978}
2979
2980static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2981{
2982 struct p_sizes *p = (struct p_sizes *)h;
2983 enum determine_dev_size dd = unchanged;
2984 unsigned int max_seg_s;
2985 sector_t p_size, p_usize, my_usize;
2986 int ldsc = 0; /* local disk size changed */
Philipp Reisnere89b5912010-03-24 17:11:33 +01002987 enum dds_flags ddsf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002988
2989 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2990 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2991 return FALSE;
2992
2993 p_size = be64_to_cpu(p->d_size);
2994 p_usize = be64_to_cpu(p->u_size);
2995
2996 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2997 dev_err(DEV, "some backing storage is needed\n");
2998 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2999 return FALSE;
3000 }
3001
3002 /* just store the peer's disk size for now.
3003 * we still need to figure out whether we accept that. */
3004 mdev->p_size = p_size;
3005
3006#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
3007 if (get_ldev(mdev)) {
3008 warn_if_differ_considerably(mdev, "lower level device sizes",
3009 p_size, drbd_get_max_capacity(mdev->ldev));
3010 warn_if_differ_considerably(mdev, "user requested size",
3011 p_usize, mdev->ldev->dc.disk_size);
3012
3013 /* if this is the first connect, or an otherwise expected
3014 * param exchange, choose the minimum */
3015 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3016 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3017 p_usize);
3018
3019 my_usize = mdev->ldev->dc.disk_size;
3020
3021 if (mdev->ldev->dc.disk_size != p_usize) {
3022 mdev->ldev->dc.disk_size = p_usize;
3023 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3024 (unsigned long)mdev->ldev->dc.disk_size);
3025 }
3026
3027 /* Never shrink a device with usable data during connect.
3028 But allow online shrinking if we are connected. */
Philipp Reisnera393db62009-12-22 13:35:52 +01003029 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
Philipp Reisnerb411b362009-09-25 16:07:19 -07003030 drbd_get_capacity(mdev->this_bdev) &&
3031 mdev->state.disk >= D_OUTDATED &&
3032 mdev->state.conn < C_CONNECTED) {
3033 dev_err(DEV, "The peer's disk size is too small!\n");
3034 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3035 mdev->ldev->dc.disk_size = my_usize;
3036 put_ldev(mdev);
3037 return FALSE;
3038 }
3039 put_ldev(mdev);
3040 }
3041#undef min_not_zero
3042
Philipp Reisnere89b5912010-03-24 17:11:33 +01003043 ddsf = be16_to_cpu(p->dds_flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003044 if (get_ldev(mdev)) {
Philipp Reisnere89b5912010-03-24 17:11:33 +01003045 dd = drbd_determin_dev_size(mdev, ddsf);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003046 put_ldev(mdev);
3047 if (dd == dev_size_error)
3048 return FALSE;
3049 drbd_md_sync(mdev);
3050 } else {
3051 /* I am diskless, need to accept the peer's size. */
3052 drbd_set_my_capacity(mdev, p_size);
3053 }
3054
Philipp Reisnerb411b362009-09-25 16:07:19 -07003055 if (get_ldev(mdev)) {
3056 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3057 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3058 ldsc = 1;
3059 }
3060
Lars Ellenberga1c88d02010-05-14 19:16:41 +02003061 if (mdev->agreed_pro_version < 94)
3062 max_seg_s = be32_to_cpu(p->max_segment_size);
3063 else /* drbd 8.3.8 onwards */
3064 max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3065
Philipp Reisnerb411b362009-09-25 16:07:19 -07003066 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3067 drbd_setup_queue_param(mdev, max_seg_s);
3068
Philipp Reisnere89b5912010-03-24 17:11:33 +01003069 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003070 put_ldev(mdev);
3071 }
3072
3073 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3074 if (be64_to_cpu(p->c_size) !=
3075 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3076 /* we have different sizes, probably peer
3077 * needs to know my new size... */
Philipp Reisnere89b5912010-03-24 17:11:33 +01003078 drbd_send_sizes(mdev, 0, ddsf);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003079 }
3080 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3081 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3082 if (mdev->state.pdsk >= D_INCONSISTENT &&
Philipp Reisnere89b5912010-03-24 17:11:33 +01003083 mdev->state.disk >= D_INCONSISTENT) {
3084 if (ddsf & DDSF_NO_RESYNC)
3085 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3086 else
3087 resync_after_online_grow(mdev);
3088 } else
Philipp Reisnerb411b362009-09-25 16:07:19 -07003089 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3090 }
3091 }
3092
3093 return TRUE;
3094}
3095
3096static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3097{
3098 struct p_uuids *p = (struct p_uuids *)h;
3099 u64 *p_uuid;
3100 int i;
3101
3102 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3103 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3104 return FALSE;
3105
3106 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3107
3108 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3109 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3110
3111 kfree(mdev->p_uuid);
3112 mdev->p_uuid = p_uuid;
3113
3114 if (mdev->state.conn < C_CONNECTED &&
3115 mdev->state.disk < D_INCONSISTENT &&
3116 mdev->state.role == R_PRIMARY &&
3117 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3118 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3119 (unsigned long long)mdev->ed_uuid);
3120 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3121 return FALSE;
3122 }
3123
3124 if (get_ldev(mdev)) {
3125 int skip_initial_sync =
3126 mdev->state.conn == C_CONNECTED &&
3127 mdev->agreed_pro_version >= 90 &&
3128 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3129 (p_uuid[UI_FLAGS] & 8);
3130 if (skip_initial_sync) {
3131 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3132 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3133 "clear_n_write from receive_uuids");
3134 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3135 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3136 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3137 CS_VERBOSE, NULL);
3138 drbd_md_sync(mdev);
3139 }
3140 put_ldev(mdev);
Philipp Reisner18a50fa2010-06-21 14:14:15 +02003141 } else if (mdev->state.disk < D_INCONSISTENT &&
3142 mdev->state.role == R_PRIMARY) {
3143 /* I am a diskless primary, the peer just created a new current UUID
3144 for me. */
3145 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003146 }
3147
3148 /* Before we test for the disk state, we should wait until an eventually
3149 ongoing cluster wide state change is finished. That is important if
3150 we are primary and are detaching from our disk. We need to see the
3151 new disk state... */
3152 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3153 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3154 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3155
3156 return TRUE;
3157}
3158
3159/**
3160 * convert_state() - Converts the peer's view of the cluster state to our point of view
3161 * @ps: The state as seen by the peer.
3162 */
3163static union drbd_state convert_state(union drbd_state ps)
3164{
3165 union drbd_state ms;
3166
3167 static enum drbd_conns c_tab[] = {
3168 [C_CONNECTED] = C_CONNECTED,
3169
3170 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3171 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3172 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3173 [C_VERIFY_S] = C_VERIFY_T,
3174 [C_MASK] = C_MASK,
3175 };
3176
3177 ms.i = ps.i;
3178
3179 ms.conn = c_tab[ps.conn];
3180 ms.peer = ps.role;
3181 ms.role = ps.peer;
3182 ms.pdsk = ps.disk;
3183 ms.disk = ps.pdsk;
3184 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3185
3186 return ms;
3187}
3188
3189static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3190{
3191 struct p_req_state *p = (struct p_req_state *)h;
3192 union drbd_state mask, val;
3193 int rv;
3194
3195 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3196 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3197 return FALSE;
3198
3199 mask.i = be32_to_cpu(p->mask);
3200 val.i = be32_to_cpu(p->val);
3201
3202 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3203 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3204 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3205 return TRUE;
3206 }
3207
3208 mask = convert_state(mask);
3209 val = convert_state(val);
3210
3211 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3212
3213 drbd_send_sr_reply(mdev, rv);
3214 drbd_md_sync(mdev);
3215
3216 return TRUE;
3217}
3218
3219static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3220{
3221 struct p_state *p = (struct p_state *)h;
3222 enum drbd_conns nconn, oconn;
3223 union drbd_state ns, peer_state;
3224 enum drbd_disk_state real_peer_disk;
Philipp Reisner65d922c2010-06-16 16:18:09 +02003225 enum chg_state_flags cs_flags;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003226 int rv;
3227
3228 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3229 return FALSE;
3230
3231 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3232 return FALSE;
3233
3234 peer_state.i = be32_to_cpu(p->state);
3235
3236 real_peer_disk = peer_state.disk;
3237 if (peer_state.disk == D_NEGOTIATING) {
3238 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3239 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3240 }
3241
3242 spin_lock_irq(&mdev->req_lock);
3243 retry:
3244 oconn = nconn = mdev->state.conn;
3245 spin_unlock_irq(&mdev->req_lock);
3246
3247 if (nconn == C_WF_REPORT_PARAMS)
3248 nconn = C_CONNECTED;
3249
3250 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3251 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3252 int cr; /* consider resync */
3253
3254 /* if we established a new connection */
3255 cr = (oconn < C_CONNECTED);
3256 /* if we had an established connection
3257 * and one of the nodes newly attaches a disk */
3258 cr |= (oconn == C_CONNECTED &&
3259 (peer_state.disk == D_NEGOTIATING ||
3260 mdev->state.disk == D_NEGOTIATING));
3261 /* if we have both been inconsistent, and the peer has been
3262 * forced to be UpToDate with --overwrite-data */
3263 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3264 /* if we had been plain connected, and the admin requested to
3265 * start a sync by "invalidate" or "invalidate-remote" */
3266 cr |= (oconn == C_CONNECTED &&
3267 (peer_state.conn >= C_STARTING_SYNC_S &&
3268 peer_state.conn <= C_WF_BITMAP_T));
3269
3270 if (cr)
3271 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3272
3273 put_ldev(mdev);
3274 if (nconn == C_MASK) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01003275 nconn = C_CONNECTED;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003276 if (mdev->state.disk == D_NEGOTIATING) {
3277 drbd_force_state(mdev, NS(disk, D_DISKLESS));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003278 } else if (peer_state.disk == D_NEGOTIATING) {
3279 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3280 peer_state.disk = D_DISKLESS;
Lars Ellenberg580b9762010-02-26 23:15:23 +01003281 real_peer_disk = D_DISKLESS;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003282 } else {
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01003283 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3284 return FALSE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003285 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3286 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3287 return FALSE;
3288 }
3289 }
3290 }
3291
3292 spin_lock_irq(&mdev->req_lock);
3293 if (mdev->state.conn != oconn)
3294 goto retry;
3295 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3296 ns.i = mdev->state.i;
3297 ns.conn = nconn;
3298 ns.peer = peer_state.role;
3299 ns.pdsk = real_peer_disk;
3300 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3301 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3302 ns.disk = mdev->new_state_tmp.disk;
Philipp Reisner65d922c2010-06-16 16:18:09 +02003303 cs_flags = CS_VERBOSE + (oconn < C_CONNECTED && nconn >= C_CONNECTED ? 0 : CS_HARD);
Philipp Reisner481c6f52010-06-22 14:03:27 +02003304 if (ns.pdsk == D_CONSISTENT && ns.susp && nconn == C_CONNECTED && oconn < C_CONNECTED &&
3305 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3306 /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3307 for temporal network outages! */
3308 spin_unlock_irq(&mdev->req_lock);
3309 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3310 tl_clear(mdev);
3311 drbd_uuid_new_current(mdev);
3312 clear_bit(NEW_CUR_UUID, &mdev->flags);
3313 drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3314 return FALSE;
3315 }
Philipp Reisner65d922c2010-06-16 16:18:09 +02003316 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003317 ns = mdev->state;
3318 spin_unlock_irq(&mdev->req_lock);
3319
3320 if (rv < SS_SUCCESS) {
3321 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3322 return FALSE;
3323 }
3324
3325 if (oconn > C_WF_REPORT_PARAMS) {
3326 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3327 peer_state.disk != D_NEGOTIATING ) {
3328 /* we want resync, peer has not yet decided to sync... */
3329 /* Nowadays only used when forcing a node into primary role and
3330 setting its disk to UpToDate with that */
3331 drbd_send_uuids(mdev);
3332 drbd_send_state(mdev);
3333 }
3334 }
3335
3336 mdev->net_conf->want_lose = 0;
3337
3338 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3339
3340 return TRUE;
3341}
3342
3343static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3344{
3345 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3346
3347 wait_event(mdev->misc_wait,
3348 mdev->state.conn == C_WF_SYNC_UUID ||
3349 mdev->state.conn < C_CONNECTED ||
3350 mdev->state.disk < D_NEGOTIATING);
3351
3352 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3353
3354 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3355 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3356 return FALSE;
3357
3358 /* Here the _drbd_uuid_ functions are right, current should
3359 _not_ be rotated into the history */
3360 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3361 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3362 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3363
3364 drbd_start_resync(mdev, C_SYNC_TARGET);
3365
3366 put_ldev(mdev);
3367 } else
3368 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3369
3370 return TRUE;
3371}
3372
3373enum receive_bitmap_ret { OK, DONE, FAILED };
3374
3375static enum receive_bitmap_ret
3376receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3377 unsigned long *buffer, struct bm_xfer_ctx *c)
3378{
3379 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3380 unsigned want = num_words * sizeof(long);
3381
3382 if (want != h->length) {
3383 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3384 return FAILED;
3385 }
3386 if (want == 0)
3387 return DONE;
3388 if (drbd_recv(mdev, buffer, want) != want)
3389 return FAILED;
3390
3391 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3392
3393 c->word_offset += num_words;
3394 c->bit_offset = c->word_offset * BITS_PER_LONG;
3395 if (c->bit_offset > c->bm_bits)
3396 c->bit_offset = c->bm_bits;
3397
3398 return OK;
3399}
3400
3401static enum receive_bitmap_ret
3402recv_bm_rle_bits(struct drbd_conf *mdev,
3403 struct p_compressed_bm *p,
3404 struct bm_xfer_ctx *c)
3405{
3406 struct bitstream bs;
3407 u64 look_ahead;
3408 u64 rl;
3409 u64 tmp;
3410 unsigned long s = c->bit_offset;
3411 unsigned long e;
3412 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3413 int toggle = DCBP_get_start(p);
3414 int have;
3415 int bits;
3416
3417 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3418
3419 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3420 if (bits < 0)
3421 return FAILED;
3422
3423 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3424 bits = vli_decode_bits(&rl, look_ahead);
3425 if (bits <= 0)
3426 return FAILED;
3427
3428 if (toggle) {
3429 e = s + rl -1;
3430 if (e >= c->bm_bits) {
3431 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3432 return FAILED;
3433 }
3434 _drbd_bm_set_bits(mdev, s, e);
3435 }
3436
3437 if (have < bits) {
3438 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3439 have, bits, look_ahead,
3440 (unsigned int)(bs.cur.b - p->code),
3441 (unsigned int)bs.buf_len);
3442 return FAILED;
3443 }
3444 look_ahead >>= bits;
3445 have -= bits;
3446
3447 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3448 if (bits < 0)
3449 return FAILED;
3450 look_ahead |= tmp << have;
3451 have += bits;
3452 }
3453
3454 c->bit_offset = s;
3455 bm_xfer_ctx_bit_to_word_offset(c);
3456
3457 return (s == c->bm_bits) ? DONE : OK;
3458}
3459
3460static enum receive_bitmap_ret
3461decode_bitmap_c(struct drbd_conf *mdev,
3462 struct p_compressed_bm *p,
3463 struct bm_xfer_ctx *c)
3464{
3465 if (DCBP_get_code(p) == RLE_VLI_Bits)
3466 return recv_bm_rle_bits(mdev, p, c);
3467
3468 /* other variants had been implemented for evaluation,
3469 * but have been dropped as this one turned out to be "best"
3470 * during all our tests. */
3471
3472 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3473 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3474 return FAILED;
3475}
3476
3477void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3478 const char *direction, struct bm_xfer_ctx *c)
3479{
3480 /* what would it take to transfer it "plaintext" */
3481 unsigned plain = sizeof(struct p_header) *
3482 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3483 + c->bm_words * sizeof(long);
3484 unsigned total = c->bytes[0] + c->bytes[1];
3485 unsigned r;
3486
3487 /* total can not be zero. but just in case: */
3488 if (total == 0)
3489 return;
3490
3491 /* don't report if not compressed */
3492 if (total >= plain)
3493 return;
3494
3495 /* total < plain. check for overflow, still */
3496 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3497 : (1000 * total / plain);
3498
3499 if (r > 1000)
3500 r = 1000;
3501
3502 r = 1000 - r;
3503 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3504 "total %u; compression: %u.%u%%\n",
3505 direction,
3506 c->bytes[1], c->packets[1],
3507 c->bytes[0], c->packets[0],
3508 total, r/10, r % 10);
3509}
3510
3511/* Since we are processing the bitfield from lower addresses to higher,
3512 it does not matter if the process it in 32 bit chunks or 64 bit
3513 chunks as long as it is little endian. (Understand it as byte stream,
3514 beginning with the lowest byte...) If we would use big endian
3515 we would need to process it from the highest address to the lowest,
3516 in order to be agnostic to the 32 vs 64 bits issue.
3517
3518 returns 0 on failure, 1 if we successfully received it. */
3519static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3520{
3521 struct bm_xfer_ctx c;
3522 void *buffer;
3523 enum receive_bitmap_ret ret;
3524 int ok = FALSE;
3525
3526 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3527
3528 drbd_bm_lock(mdev, "receive bitmap");
3529
3530 /* maybe we should use some per thread scratch page,
3531 * and allocate that during initial device creation? */
3532 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3533 if (!buffer) {
3534 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3535 goto out;
3536 }
3537
3538 c = (struct bm_xfer_ctx) {
3539 .bm_bits = drbd_bm_bits(mdev),
3540 .bm_words = drbd_bm_words(mdev),
3541 };
3542
3543 do {
3544 if (h->command == P_BITMAP) {
3545 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3546 } else if (h->command == P_COMPRESSED_BITMAP) {
3547 /* MAYBE: sanity check that we speak proto >= 90,
3548 * and the feature is enabled! */
3549 struct p_compressed_bm *p;
3550
3551 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3552 dev_err(DEV, "ReportCBitmap packet too large\n");
3553 goto out;
3554 }
3555 /* use the page buff */
3556 p = buffer;
3557 memcpy(p, h, sizeof(*h));
3558 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3559 goto out;
3560 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3561 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3562 return FAILED;
3563 }
3564 ret = decode_bitmap_c(mdev, p, &c);
3565 } else {
3566 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3567 goto out;
3568 }
3569
3570 c.packets[h->command == P_BITMAP]++;
3571 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3572
3573 if (ret != OK)
3574 break;
3575
3576 if (!drbd_recv_header(mdev, h))
3577 goto out;
3578 } while (ret == OK);
3579 if (ret == FAILED)
3580 goto out;
3581
3582 INFO_bm_xfer_stats(mdev, "receive", &c);
3583
3584 if (mdev->state.conn == C_WF_BITMAP_T) {
3585 ok = !drbd_send_bitmap(mdev);
3586 if (!ok)
3587 goto out;
3588 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3589 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3590 D_ASSERT(ok == SS_SUCCESS);
3591 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3592 /* admin may have requested C_DISCONNECTING,
3593 * other threads may have noticed network errors */
3594 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3595 drbd_conn_str(mdev->state.conn));
3596 }
3597
3598 ok = TRUE;
3599 out:
3600 drbd_bm_unlock(mdev);
3601 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3602 drbd_start_resync(mdev, C_SYNC_SOURCE);
3603 free_page((unsigned long) buffer);
3604 return ok;
3605}
3606
Lars Ellenberge7f52df2010-08-03 20:20:20 +02003607static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
Philipp Reisnerb411b362009-09-25 16:07:19 -07003608{
3609 /* TODO zero copy sink :) */
3610 static char sink[128];
3611 int size, want, r;
3612
Lars Ellenberge7f52df2010-08-03 20:20:20 +02003613 if (!silent)
3614 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3615 h->command, h->length);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003616
3617 size = h->length;
3618 while (size > 0) {
3619 want = min_t(int, size, sizeof(sink));
3620 r = drbd_recv(mdev, sink, want);
3621 ERR_IF(r <= 0) break;
3622 size -= r;
3623 }
3624 return size == 0;
3625}
3626
Lars Ellenberge7f52df2010-08-03 20:20:20 +02003627static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3628{
3629 return receive_skip_(mdev, h, 0);
3630}
3631
3632static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h)
3633{
3634 return receive_skip_(mdev, h, 1);
3635}
3636
Philipp Reisnerb411b362009-09-25 16:07:19 -07003637static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3638{
3639 if (mdev->state.disk >= D_INCONSISTENT)
3640 drbd_kick_lo(mdev);
3641
3642 /* Make sure we've acked all the TCP data associated
3643 * with the data requests being unplugged */
3644 drbd_tcp_quickack(mdev->data.socket);
3645
3646 return TRUE;
3647}
3648
3649typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3650
3651static drbd_cmd_handler_f drbd_default_handler[] = {
3652 [P_DATA] = receive_Data,
3653 [P_DATA_REPLY] = receive_DataReply,
3654 [P_RS_DATA_REPLY] = receive_RSDataReply,
3655 [P_BARRIER] = receive_Barrier,
3656 [P_BITMAP] = receive_bitmap,
3657 [P_COMPRESSED_BITMAP] = receive_bitmap,
3658 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3659 [P_DATA_REQUEST] = receive_DataRequest,
3660 [P_RS_DATA_REQUEST] = receive_DataRequest,
3661 [P_SYNC_PARAM] = receive_SyncParam,
3662 [P_SYNC_PARAM89] = receive_SyncParam,
3663 [P_PROTOCOL] = receive_protocol,
3664 [P_UUIDS] = receive_uuids,
3665 [P_SIZES] = receive_sizes,
3666 [P_STATE] = receive_state,
3667 [P_STATE_CHG_REQ] = receive_req_state,
3668 [P_SYNC_UUID] = receive_sync_uuid,
3669 [P_OV_REQUEST] = receive_DataRequest,
3670 [P_OV_REPLY] = receive_DataRequest,
3671 [P_CSUM_RS_REQUEST] = receive_DataRequest,
Lars Ellenberge7f52df2010-08-03 20:20:20 +02003672 [P_DELAY_PROBE] = receive_skip_silent,
Philipp Reisnerb411b362009-09-25 16:07:19 -07003673 /* anything missing from this table is in
3674 * the asender_tbl, see get_asender_cmd */
3675 [P_MAX_CMD] = NULL,
3676};
3677
3678static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3679static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3680
3681static void drbdd(struct drbd_conf *mdev)
3682{
3683 drbd_cmd_handler_f handler;
3684 struct p_header *header = &mdev->data.rbuf.header;
3685
3686 while (get_t_state(&mdev->receiver) == Running) {
3687 drbd_thread_current_set_cpu(mdev);
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003688 if (!drbd_recv_header(mdev, header)) {
3689 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003690 break;
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003691 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003692
3693 if (header->command < P_MAX_CMD)
3694 handler = drbd_cmd_handler[header->command];
3695 else if (P_MAY_IGNORE < header->command
3696 && header->command < P_MAX_OPT_CMD)
3697 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3698 else if (header->command > P_MAX_OPT_CMD)
3699 handler = receive_skip;
3700 else
3701 handler = NULL;
3702
3703 if (unlikely(!handler)) {
3704 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3705 header->command, header->length);
3706 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3707 break;
3708 }
3709 if (unlikely(!handler(mdev, header))) {
3710 dev_err(DEV, "error receiving %s, l: %d!\n",
3711 cmdname(header->command), header->length);
3712 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3713 break;
3714 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003715 }
3716}
3717
Philipp Reisnerb411b362009-09-25 16:07:19 -07003718void drbd_flush_workqueue(struct drbd_conf *mdev)
3719{
3720 struct drbd_wq_barrier barr;
3721
3722 barr.w.cb = w_prev_work_done;
3723 init_completion(&barr.done);
3724 drbd_queue_work(&mdev->data.work, &barr.w);
3725 wait_for_completion(&barr.done);
3726}
3727
Philipp Reisnerf70b35112010-06-24 14:34:40 +02003728void drbd_free_tl_hash(struct drbd_conf *mdev)
3729{
3730 struct hlist_head *h;
3731
3732 spin_lock_irq(&mdev->req_lock);
3733
3734 if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3735 spin_unlock_irq(&mdev->req_lock);
3736 return;
3737 }
3738 /* paranoia code */
3739 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3740 if (h->first)
3741 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3742 (int)(h - mdev->ee_hash), h->first);
3743 kfree(mdev->ee_hash);
3744 mdev->ee_hash = NULL;
3745 mdev->ee_hash_s = 0;
3746
3747 /* paranoia code */
3748 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3749 if (h->first)
3750 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3751 (int)(h - mdev->tl_hash), h->first);
3752 kfree(mdev->tl_hash);
3753 mdev->tl_hash = NULL;
3754 mdev->tl_hash_s = 0;
3755 spin_unlock_irq(&mdev->req_lock);
3756}
3757
Philipp Reisnerb411b362009-09-25 16:07:19 -07003758static void drbd_disconnect(struct drbd_conf *mdev)
3759{
3760 enum drbd_fencing_p fp;
3761 union drbd_state os, ns;
3762 int rv = SS_UNKNOWN_ERROR;
3763 unsigned int i;
3764
3765 if (mdev->state.conn == C_STANDALONE)
3766 return;
3767 if (mdev->state.conn >= C_WF_CONNECTION)
3768 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3769 drbd_conn_str(mdev->state.conn));
3770
3771 /* asender does not clean up anything. it must not interfere, either */
3772 drbd_thread_stop(&mdev->asender);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003773 drbd_free_sock(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003774
Philipp Reisner85719572010-07-21 10:20:17 +02003775 /* wait for current activity to cease. */
Philipp Reisnerb411b362009-09-25 16:07:19 -07003776 spin_lock_irq(&mdev->req_lock);
3777 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3778 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3779 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3780 spin_unlock_irq(&mdev->req_lock);
3781
3782 /* We do not have data structures that would allow us to
3783 * get the rs_pending_cnt down to 0 again.
3784 * * On C_SYNC_TARGET we do not have any data structures describing
3785 * the pending RSDataRequest's we have sent.
3786 * * On C_SYNC_SOURCE there is no data structure that tracks
3787 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3788 * And no, it is not the sum of the reference counts in the
3789 * resync_LRU. The resync_LRU tracks the whole operation including
3790 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3791 * on the fly. */
3792 drbd_rs_cancel_all(mdev);
3793 mdev->rs_total = 0;
3794 mdev->rs_failed = 0;
3795 atomic_set(&mdev->rs_pending_cnt, 0);
3796 wake_up(&mdev->misc_wait);
3797
3798 /* make sure syncer is stopped and w_resume_next_sg queued */
3799 del_timer_sync(&mdev->resync_timer);
3800 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3801 resync_timer_fn((unsigned long)mdev);
3802
Philipp Reisnerb411b362009-09-25 16:07:19 -07003803 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3804 * w_make_resync_request etc. which may still be on the worker queue
3805 * to be "canceled" */
3806 drbd_flush_workqueue(mdev);
3807
3808 /* This also does reclaim_net_ee(). If we do this too early, we might
3809 * miss some resync ee and pages.*/
3810 drbd_process_done_ee(mdev);
3811
3812 kfree(mdev->p_uuid);
3813 mdev->p_uuid = NULL;
3814
3815 if (!mdev->state.susp)
3816 tl_clear(mdev);
3817
Philipp Reisnerb411b362009-09-25 16:07:19 -07003818 dev_info(DEV, "Connection closed\n");
3819
3820 drbd_md_sync(mdev);
3821
3822 fp = FP_DONT_CARE;
3823 if (get_ldev(mdev)) {
3824 fp = mdev->ldev->dc.fencing;
3825 put_ldev(mdev);
3826 }
3827
Philipp Reisner87f7be42010-06-11 13:56:33 +02003828 if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3829 drbd_try_outdate_peer_async(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003830
3831 spin_lock_irq(&mdev->req_lock);
3832 os = mdev->state;
3833 if (os.conn >= C_UNCONNECTED) {
3834 /* Do not restart in case we are C_DISCONNECTING */
3835 ns = os;
3836 ns.conn = C_UNCONNECTED;
3837 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3838 }
3839 spin_unlock_irq(&mdev->req_lock);
3840
3841 if (os.conn == C_DISCONNECTING) {
Philipp Reisner84dfb9f2010-06-23 11:20:05 +02003842 wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003843
Philipp Reisnerf70b35112010-06-24 14:34:40 +02003844 if (!mdev->state.susp) {
3845 /* we must not free the tl_hash
3846 * while application io is still on the fly */
3847 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3848 drbd_free_tl_hash(mdev);
3849 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003850
3851 crypto_free_hash(mdev->cram_hmac_tfm);
3852 mdev->cram_hmac_tfm = NULL;
3853
3854 kfree(mdev->net_conf);
3855 mdev->net_conf = NULL;
3856 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3857 }
3858
3859 /* tcp_close and release of sendpage pages can be deferred. I don't
3860 * want to use SO_LINGER, because apparently it can be deferred for
3861 * more than 20 seconds (longest time I checked).
3862 *
3863 * Actually we don't care for exactly when the network stack does its
3864 * put_page(), but release our reference on these pages right here.
3865 */
3866 i = drbd_release_ee(mdev, &mdev->net_ee);
3867 if (i)
3868 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3869 i = atomic_read(&mdev->pp_in_use);
3870 if (i)
Lars Ellenberg45bb9122010-05-14 17:10:48 +02003871 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003872
3873 D_ASSERT(list_empty(&mdev->read_ee));
3874 D_ASSERT(list_empty(&mdev->active_ee));
3875 D_ASSERT(list_empty(&mdev->sync_ee));
3876 D_ASSERT(list_empty(&mdev->done_ee));
3877
3878 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3879 atomic_set(&mdev->current_epoch->epoch_size, 0);
3880 D_ASSERT(list_empty(&mdev->current_epoch->list));
3881}
3882
3883/*
3884 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3885 * we can agree on is stored in agreed_pro_version.
3886 *
3887 * feature flags and the reserved array should be enough room for future
3888 * enhancements of the handshake protocol, and possible plugins...
3889 *
3890 * for now, they are expected to be zero, but ignored.
3891 */
3892static int drbd_send_handshake(struct drbd_conf *mdev)
3893{
3894 /* ASSERT current == mdev->receiver ... */
3895 struct p_handshake *p = &mdev->data.sbuf.handshake;
3896 int ok;
3897
3898 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3899 dev_err(DEV, "interrupted during initial handshake\n");
3900 return 0; /* interrupted. not ok. */
3901 }
3902
3903 if (mdev->data.socket == NULL) {
3904 mutex_unlock(&mdev->data.mutex);
3905 return 0;
3906 }
3907
3908 memset(p, 0, sizeof(*p));
3909 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3910 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3911 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3912 (struct p_header *)p, sizeof(*p), 0 );
3913 mutex_unlock(&mdev->data.mutex);
3914 return ok;
3915}
3916
3917/*
3918 * return values:
3919 * 1 yes, we have a valid connection
3920 * 0 oops, did not work out, please try again
3921 * -1 peer talks different language,
3922 * no point in trying again, please go standalone.
3923 */
3924static int drbd_do_handshake(struct drbd_conf *mdev)
3925{
3926 /* ASSERT current == mdev->receiver ... */
3927 struct p_handshake *p = &mdev->data.rbuf.handshake;
3928 const int expect = sizeof(struct p_handshake)
3929 -sizeof(struct p_header);
3930 int rv;
3931
3932 rv = drbd_send_handshake(mdev);
3933 if (!rv)
3934 return 0;
3935
3936 rv = drbd_recv_header(mdev, &p->head);
3937 if (!rv)
3938 return 0;
3939
3940 if (p->head.command != P_HAND_SHAKE) {
3941 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3942 cmdname(p->head.command), p->head.command);
3943 return -1;
3944 }
3945
3946 if (p->head.length != expect) {
3947 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3948 expect, p->head.length);
3949 return -1;
3950 }
3951
3952 rv = drbd_recv(mdev, &p->head.payload, expect);
3953
3954 if (rv != expect) {
3955 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3956 return 0;
3957 }
3958
Philipp Reisnerb411b362009-09-25 16:07:19 -07003959 p->protocol_min = be32_to_cpu(p->protocol_min);
3960 p->protocol_max = be32_to_cpu(p->protocol_max);
3961 if (p->protocol_max == 0)
3962 p->protocol_max = p->protocol_min;
3963
3964 if (PRO_VERSION_MAX < p->protocol_min ||
3965 PRO_VERSION_MIN > p->protocol_max)
3966 goto incompat;
3967
3968 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3969
3970 dev_info(DEV, "Handshake successful: "
3971 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3972
3973 return 1;
3974
3975 incompat:
3976 dev_err(DEV, "incompatible DRBD dialects: "
3977 "I support %d-%d, peer supports %d-%d\n",
3978 PRO_VERSION_MIN, PRO_VERSION_MAX,
3979 p->protocol_min, p->protocol_max);
3980 return -1;
3981}
3982
3983#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3984static int drbd_do_auth(struct drbd_conf *mdev)
3985{
3986 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3987 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01003988 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003989}
3990#else
3991#define CHALLENGE_LEN 64
Johannes Thomab10d96c2010-01-07 16:02:50 +01003992
3993/* Return value:
3994 1 - auth succeeded,
3995 0 - failed, try again (network error),
3996 -1 - auth failed, don't try again.
3997*/
3998
Philipp Reisnerb411b362009-09-25 16:07:19 -07003999static int drbd_do_auth(struct drbd_conf *mdev)
4000{
4001 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4002 struct scatterlist sg;
4003 char *response = NULL;
4004 char *right_response = NULL;
4005 char *peers_ch = NULL;
4006 struct p_header p;
4007 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4008 unsigned int resp_size;
4009 struct hash_desc desc;
4010 int rv;
4011
4012 desc.tfm = mdev->cram_hmac_tfm;
4013 desc.flags = 0;
4014
4015 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4016 (u8 *)mdev->net_conf->shared_secret, key_len);
4017 if (rv) {
4018 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004019 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004020 goto fail;
4021 }
4022
4023 get_random_bytes(my_challenge, CHALLENGE_LEN);
4024
4025 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4026 if (!rv)
4027 goto fail;
4028
4029 rv = drbd_recv_header(mdev, &p);
4030 if (!rv)
4031 goto fail;
4032
4033 if (p.command != P_AUTH_CHALLENGE) {
4034 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4035 cmdname(p.command), p.command);
4036 rv = 0;
4037 goto fail;
4038 }
4039
4040 if (p.length > CHALLENGE_LEN*2) {
4041 dev_err(DEV, "expected AuthChallenge payload too big.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004042 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004043 goto fail;
4044 }
4045
4046 peers_ch = kmalloc(p.length, GFP_NOIO);
4047 if (peers_ch == NULL) {
4048 dev_err(DEV, "kmalloc of peers_ch failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004049 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004050 goto fail;
4051 }
4052
4053 rv = drbd_recv(mdev, peers_ch, p.length);
4054
4055 if (rv != p.length) {
4056 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4057 rv = 0;
4058 goto fail;
4059 }
4060
4061 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4062 response = kmalloc(resp_size, GFP_NOIO);
4063 if (response == NULL) {
4064 dev_err(DEV, "kmalloc of response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004065 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004066 goto fail;
4067 }
4068
4069 sg_init_table(&sg, 1);
4070 sg_set_buf(&sg, peers_ch, p.length);
4071
4072 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4073 if (rv) {
4074 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004075 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004076 goto fail;
4077 }
4078
4079 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4080 if (!rv)
4081 goto fail;
4082
4083 rv = drbd_recv_header(mdev, &p);
4084 if (!rv)
4085 goto fail;
4086
4087 if (p.command != P_AUTH_RESPONSE) {
4088 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4089 cmdname(p.command), p.command);
4090 rv = 0;
4091 goto fail;
4092 }
4093
4094 if (p.length != resp_size) {
4095 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4096 rv = 0;
4097 goto fail;
4098 }
4099
4100 rv = drbd_recv(mdev, response , resp_size);
4101
4102 if (rv != resp_size) {
4103 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4104 rv = 0;
4105 goto fail;
4106 }
4107
4108 right_response = kmalloc(resp_size, GFP_NOIO);
Julia Lawall2d1ee872009-12-27 22:27:11 +01004109 if (right_response == NULL) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07004110 dev_err(DEV, "kmalloc of right_response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004111 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004112 goto fail;
4113 }
4114
4115 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4116
4117 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4118 if (rv) {
4119 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004120 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004121 goto fail;
4122 }
4123
4124 rv = !memcmp(response, right_response, resp_size);
4125
4126 if (rv)
4127 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4128 resp_size, mdev->net_conf->cram_hmac_alg);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004129 else
4130 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004131
4132 fail:
4133 kfree(peers_ch);
4134 kfree(response);
4135 kfree(right_response);
4136
4137 return rv;
4138}
4139#endif
4140
4141int drbdd_init(struct drbd_thread *thi)
4142{
4143 struct drbd_conf *mdev = thi->mdev;
4144 unsigned int minor = mdev_to_minor(mdev);
4145 int h;
4146
4147 sprintf(current->comm, "drbd%d_receiver", minor);
4148
4149 dev_info(DEV, "receiver (re)started\n");
4150
4151 do {
4152 h = drbd_connect(mdev);
4153 if (h == 0) {
4154 drbd_disconnect(mdev);
4155 __set_current_state(TASK_INTERRUPTIBLE);
4156 schedule_timeout(HZ);
4157 }
4158 if (h == -1) {
4159 dev_warn(DEV, "Discarding network configuration.\n");
4160 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4161 }
4162 } while (h == 0);
4163
4164 if (h > 0) {
4165 if (get_net_conf(mdev)) {
4166 drbdd(mdev);
4167 put_net_conf(mdev);
4168 }
4169 }
4170
4171 drbd_disconnect(mdev);
4172
4173 dev_info(DEV, "receiver terminated\n");
4174 return 0;
4175}
4176
4177/* ********* acknowledge sender ******** */
4178
4179static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4180{
4181 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4182
4183 int retcode = be32_to_cpu(p->retcode);
4184
4185 if (retcode >= SS_SUCCESS) {
4186 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4187 } else {
4188 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4189 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4190 drbd_set_st_err_str(retcode), retcode);
4191 }
4192 wake_up(&mdev->state_wait);
4193
4194 return TRUE;
4195}
4196
4197static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4198{
4199 return drbd_send_ping_ack(mdev);
4200
4201}
4202
4203static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4204{
4205 /* restore idle timeout */
4206 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
Philipp Reisner309d1602010-03-02 15:03:44 +01004207 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4208 wake_up(&mdev->misc_wait);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004209
4210 return TRUE;
4211}
4212
4213static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4214{
4215 struct p_block_ack *p = (struct p_block_ack *)h;
4216 sector_t sector = be64_to_cpu(p->sector);
4217 int blksize = be32_to_cpu(p->blksize);
4218
4219 D_ASSERT(mdev->agreed_pro_version >= 89);
4220
4221 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4222
4223 drbd_rs_complete_io(mdev, sector);
4224 drbd_set_in_sync(mdev, sector, blksize);
4225 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4226 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4227 dec_rs_pending(mdev);
Philipp Reisner778f2712010-07-06 11:14:00 +02004228 atomic_add(blksize >> 9, &mdev->rs_sect_in);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004229
4230 return TRUE;
4231}
4232
4233/* when we receive the ACK for a write request,
4234 * verify that we actually know about it */
4235static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4236 u64 id, sector_t sector)
4237{
4238 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4239 struct hlist_node *n;
4240 struct drbd_request *req;
4241
4242 hlist_for_each_entry(req, n, slot, colision) {
4243 if ((unsigned long)req == (unsigned long)id) {
4244 if (req->sector != sector) {
4245 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4246 "wrong sector (%llus versus %llus)\n", req,
4247 (unsigned long long)req->sector,
4248 (unsigned long long)sector);
4249 break;
4250 }
4251 return req;
4252 }
4253 }
4254 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4255 (void *)(unsigned long)id, (unsigned long long)sector);
4256 return NULL;
4257}
4258
4259typedef struct drbd_request *(req_validator_fn)
4260 (struct drbd_conf *mdev, u64 id, sector_t sector);
4261
4262static int validate_req_change_req_state(struct drbd_conf *mdev,
4263 u64 id, sector_t sector, req_validator_fn validator,
4264 const char *func, enum drbd_req_event what)
4265{
4266 struct drbd_request *req;
4267 struct bio_and_error m;
4268
4269 spin_lock_irq(&mdev->req_lock);
4270 req = validator(mdev, id, sector);
4271 if (unlikely(!req)) {
4272 spin_unlock_irq(&mdev->req_lock);
4273 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4274 return FALSE;
4275 }
4276 __req_mod(req, what, &m);
4277 spin_unlock_irq(&mdev->req_lock);
4278
4279 if (m.bio)
4280 complete_master_bio(mdev, &m);
4281 return TRUE;
4282}
4283
4284static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4285{
4286 struct p_block_ack *p = (struct p_block_ack *)h;
4287 sector_t sector = be64_to_cpu(p->sector);
4288 int blksize = be32_to_cpu(p->blksize);
4289 enum drbd_req_event what;
4290
4291 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4292
4293 if (is_syncer_block_id(p->block_id)) {
4294 drbd_set_in_sync(mdev, sector, blksize);
4295 dec_rs_pending(mdev);
4296 return TRUE;
4297 }
4298 switch (be16_to_cpu(h->command)) {
4299 case P_RS_WRITE_ACK:
4300 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4301 what = write_acked_by_peer_and_sis;
4302 break;
4303 case P_WRITE_ACK:
4304 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4305 what = write_acked_by_peer;
4306 break;
4307 case P_RECV_ACK:
4308 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4309 what = recv_acked_by_peer;
4310 break;
4311 case P_DISCARD_ACK:
4312 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4313 what = conflict_discarded_by_peer;
4314 break;
4315 default:
4316 D_ASSERT(0);
4317 return FALSE;
4318 }
4319
4320 return validate_req_change_req_state(mdev, p->block_id, sector,
4321 _ack_id_to_req, __func__ , what);
4322}
4323
4324static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4325{
4326 struct p_block_ack *p = (struct p_block_ack *)h;
4327 sector_t sector = be64_to_cpu(p->sector);
4328
4329 if (__ratelimit(&drbd_ratelimit_state))
4330 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4331
4332 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4333
4334 if (is_syncer_block_id(p->block_id)) {
4335 int size = be32_to_cpu(p->blksize);
4336 dec_rs_pending(mdev);
4337 drbd_rs_failed_io(mdev, sector, size);
4338 return TRUE;
4339 }
4340 return validate_req_change_req_state(mdev, p->block_id, sector,
4341 _ack_id_to_req, __func__ , neg_acked);
4342}
4343
4344static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4345{
4346 struct p_block_ack *p = (struct p_block_ack *)h;
4347 sector_t sector = be64_to_cpu(p->sector);
4348
4349 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4350 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4351 (unsigned long long)sector, be32_to_cpu(p->blksize));
4352
4353 return validate_req_change_req_state(mdev, p->block_id, sector,
4354 _ar_id_to_req, __func__ , neg_acked);
4355}
4356
4357static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4358{
4359 sector_t sector;
4360 int size;
4361 struct p_block_ack *p = (struct p_block_ack *)h;
4362
4363 sector = be64_to_cpu(p->sector);
4364 size = be32_to_cpu(p->blksize);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004365
4366 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4367
4368 dec_rs_pending(mdev);
4369
4370 if (get_ldev_if_state(mdev, D_FAILED)) {
4371 drbd_rs_complete_io(mdev, sector);
4372 drbd_rs_failed_io(mdev, sector, size);
4373 put_ldev(mdev);
4374 }
4375
4376 return TRUE;
4377}
4378
4379static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4380{
4381 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4382
4383 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4384
4385 return TRUE;
4386}
4387
4388static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4389{
4390 struct p_block_ack *p = (struct p_block_ack *)h;
4391 struct drbd_work *w;
4392 sector_t sector;
4393 int size;
4394
4395 sector = be64_to_cpu(p->sector);
4396 size = be32_to_cpu(p->blksize);
4397
4398 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4399
4400 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4401 drbd_ov_oos_found(mdev, sector, size);
4402 else
4403 ov_oos_print(mdev);
4404
4405 drbd_rs_complete_io(mdev, sector);
4406 dec_rs_pending(mdev);
4407
4408 if (--mdev->ov_left == 0) {
4409 w = kmalloc(sizeof(*w), GFP_NOIO);
4410 if (w) {
4411 w->cb = w_ov_finished;
4412 drbd_queue_work_front(&mdev->data.work, w);
4413 } else {
4414 dev_err(DEV, "kmalloc(w) failed.");
4415 ov_oos_print(mdev);
4416 drbd_resync_finished(mdev);
4417 }
4418 }
4419 return TRUE;
4420}
4421
Lars Ellenberge7f52df2010-08-03 20:20:20 +02004422static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h)
Philipp Reisner0ced55a2010-04-30 15:26:20 +02004423{
Lars Ellenberge7f52df2010-08-03 20:20:20 +02004424 /* IGNORE */
Philipp Reisner0ced55a2010-04-30 15:26:20 +02004425 return TRUE;
4426}
4427
Philipp Reisnerb411b362009-09-25 16:07:19 -07004428struct asender_cmd {
4429 size_t pkt_size;
4430 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4431};
4432
4433static struct asender_cmd *get_asender_cmd(int cmd)
4434{
4435 static struct asender_cmd asender_tbl[] = {
4436 /* anything missing from this table is in
4437 * the drbd_cmd_handler (drbd_default_handler) table,
4438 * see the beginning of drbdd() */
4439 [P_PING] = { sizeof(struct p_header), got_Ping },
4440 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4441 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4442 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4443 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4444 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4445 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4446 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4447 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4448 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4449 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4450 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4451 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
Lars Ellenberge7f52df2010-08-03 20:20:20 +02004452 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_something_to_ignore_m },
Philipp Reisnerb411b362009-09-25 16:07:19 -07004453 [P_MAX_CMD] = { 0, NULL },
4454 };
4455 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4456 return NULL;
4457 return &asender_tbl[cmd];
4458}
4459
4460int drbd_asender(struct drbd_thread *thi)
4461{
4462 struct drbd_conf *mdev = thi->mdev;
4463 struct p_header *h = &mdev->meta.rbuf.header;
4464 struct asender_cmd *cmd = NULL;
4465
4466 int rv, len;
4467 void *buf = h;
4468 int received = 0;
4469 int expect = sizeof(struct p_header);
4470 int empty;
4471
4472 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4473
4474 current->policy = SCHED_RR; /* Make this a realtime task! */
4475 current->rt_priority = 2; /* more important than all other tasks */
4476
4477 while (get_t_state(thi) == Running) {
4478 drbd_thread_current_set_cpu(mdev);
4479 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4480 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4481 mdev->meta.socket->sk->sk_rcvtimeo =
4482 mdev->net_conf->ping_timeo*HZ/10;
4483 }
4484
4485 /* conditionally cork;
4486 * it may hurt latency if we cork without much to send */
4487 if (!mdev->net_conf->no_cork &&
4488 3 < atomic_read(&mdev->unacked_cnt))
4489 drbd_tcp_cork(mdev->meta.socket);
4490 while (1) {
4491 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4492 flush_signals(current);
4493 if (!drbd_process_done_ee(mdev)) {
4494 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4495 goto reconnect;
4496 }
4497 /* to avoid race with newly queued ACKs */
4498 set_bit(SIGNAL_ASENDER, &mdev->flags);
4499 spin_lock_irq(&mdev->req_lock);
4500 empty = list_empty(&mdev->done_ee);
4501 spin_unlock_irq(&mdev->req_lock);
4502 /* new ack may have been queued right here,
4503 * but then there is also a signal pending,
4504 * and we start over... */
4505 if (empty)
4506 break;
4507 }
4508 /* but unconditionally uncork unless disabled */
4509 if (!mdev->net_conf->no_cork)
4510 drbd_tcp_uncork(mdev->meta.socket);
4511
4512 /* short circuit, recv_msg would return EINTR anyways. */
4513 if (signal_pending(current))
4514 continue;
4515
4516 rv = drbd_recv_short(mdev, mdev->meta.socket,
4517 buf, expect-received, 0);
4518 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4519
4520 flush_signals(current);
4521
4522 /* Note:
4523 * -EINTR (on meta) we got a signal
4524 * -EAGAIN (on meta) rcvtimeo expired
4525 * -ECONNRESET other side closed the connection
4526 * -ERESTARTSYS (on data) we got a signal
4527 * rv < 0 other than above: unexpected error!
4528 * rv == expected: full header or command
4529 * rv < expected: "woken" by signal during receive
4530 * rv == 0 : "connection shut down by peer"
4531 */
4532 if (likely(rv > 0)) {
4533 received += rv;
4534 buf += rv;
4535 } else if (rv == 0) {
4536 dev_err(DEV, "meta connection shut down by peer.\n");
4537 goto reconnect;
4538 } else if (rv == -EAGAIN) {
4539 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4540 mdev->net_conf->ping_timeo*HZ/10) {
4541 dev_err(DEV, "PingAck did not arrive in time.\n");
4542 goto reconnect;
4543 }
4544 set_bit(SEND_PING, &mdev->flags);
4545 continue;
4546 } else if (rv == -EINTR) {
4547 continue;
4548 } else {
4549 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4550 goto reconnect;
4551 }
4552
4553 if (received == expect && cmd == NULL) {
4554 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4555 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4556 (long)be32_to_cpu(h->magic),
4557 h->command, h->length);
4558 goto reconnect;
4559 }
4560 cmd = get_asender_cmd(be16_to_cpu(h->command));
4561 len = be16_to_cpu(h->length);
4562 if (unlikely(cmd == NULL)) {
4563 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4564 (long)be32_to_cpu(h->magic),
4565 h->command, h->length);
4566 goto disconnect;
4567 }
4568 expect = cmd->pkt_size;
Jens Axboe6a0afdf2009-10-01 09:04:14 +02004569 ERR_IF(len != expect-sizeof(struct p_header))
Philipp Reisnerb411b362009-09-25 16:07:19 -07004570 goto reconnect;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004571 }
4572 if (received == expect) {
4573 D_ASSERT(cmd != NULL);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004574 if (!cmd->process(mdev, h))
4575 goto reconnect;
4576
4577 buf = h;
4578 received = 0;
4579 expect = sizeof(struct p_header);
4580 cmd = NULL;
4581 }
4582 }
4583
4584 if (0) {
4585reconnect:
4586 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4587 }
4588 if (0) {
4589disconnect:
4590 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4591 }
4592 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4593
4594 D_ASSERT(mdev->state.conn < C_CONNECTED);
4595 dev_info(DEV, "asender terminated\n");
4596
4597 return 0;
4598}