blob: a04ec01ab3ce0f1b11b7b1dc05e75971577e6399 [file] [log] [blame]
Philipp Reisnerb411b362009-09-25 16:07:19 -07001/*
2 drbd_receiver.c
3
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
14
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
Philipp Reisnerb411b362009-09-25 16:07:19 -070026#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
Philipp Reisnerb411b362009-09-25 16:07:19 -070031#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/smp_lock.h>
40#include <linux/pkt_sched.h>
41#define __KERNEL_SYSCALLS__
42#include <linux/unistd.h>
43#include <linux/vmalloc.h>
44#include <linux/random.h>
45#include <linux/mm.h>
46#include <linux/string.h>
47#include <linux/scatterlist.h>
48#include "drbd_int.h"
Philipp Reisnerb411b362009-09-25 16:07:19 -070049#include "drbd_req.h"
50
51#include "drbd_vli.h"
52
53struct flush_work {
54 struct drbd_work w;
55 struct drbd_epoch *epoch;
56};
57
58enum finish_epoch {
59 FE_STILL_LIVE,
60 FE_DESTROYED,
61 FE_RECYCLED,
62};
63
64static int drbd_do_handshake(struct drbd_conf *mdev);
65static int drbd_do_auth(struct drbd_conf *mdev);
66
67static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71{
72 struct drbd_epoch *prev;
73 spin_lock(&mdev->epoch_lock);
74 prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 if (prev == epoch || prev == mdev->current_epoch)
76 prev = NULL;
77 spin_unlock(&mdev->epoch_lock);
78 return prev;
79}
80
81#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
Lars Ellenberg45bb9122010-05-14 17:10:48 +020083/*
84 * some helper functions to deal with single linked page lists,
85 * page->private being our "next" pointer.
86 */
87
88/* If at least n pages are linked at head, get n pages off.
89 * Otherwise, don't modify head, and return NULL.
90 * Locking is the responsibility of the caller.
91 */
92static struct page *page_chain_del(struct page **head, int n)
93{
94 struct page *page;
95 struct page *tmp;
96
97 BUG_ON(!n);
98 BUG_ON(!head);
99
100 page = *head;
101 while (page) {
102 tmp = page_chain_next(page);
103 if (--n == 0)
104 break; /* found sufficient pages */
105 if (tmp == NULL)
106 /* insufficient pages, don't use any of them. */
107 return NULL;
108 page = tmp;
109 }
110
111 /* add end of list marker for the returned list */
112 set_page_private(page, 0);
113 /* actual return value, and adjustment of head */
114 page = *head;
115 *head = tmp;
116 return page;
117}
118
119/* may be used outside of locks to find the tail of a (usually short)
120 * "private" page chain, before adding it back to a global chain head
121 * with page_chain_add() under a spinlock. */
122static struct page *page_chain_tail(struct page *page, int *len)
123{
124 struct page *tmp;
125 int i = 1;
126 while ((tmp = page_chain_next(page)))
127 ++i, page = tmp;
128 if (len)
129 *len = i;
130 return page;
131}
132
133static int page_chain_free(struct page *page)
134{
135 struct page *tmp;
136 int i = 0;
137 page_chain_for_each_safe(page, tmp) {
138 put_page(page);
139 ++i;
140 }
141 return i;
142}
143
144static void page_chain_add(struct page **head,
145 struct page *chain_first, struct page *chain_last)
146{
147#if 1
148 struct page *tmp;
149 tmp = page_chain_tail(chain_first, NULL);
150 BUG_ON(tmp != chain_last);
151#endif
152
153 /* add chain to head */
154 set_page_private(chain_last, (unsigned long)*head);
155 *head = chain_first;
156}
157
158static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700159{
160 struct page *page = NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200161 struct page *tmp = NULL;
162 int i = 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700163
164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
165 * So what. It saves a spin_lock. */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200166 if (drbd_pp_vacant >= number) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700167 spin_lock(&drbd_pp_lock);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200168 page = page_chain_del(&drbd_pp_pool, number);
169 if (page)
170 drbd_pp_vacant -= number;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700171 spin_unlock(&drbd_pp_lock);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200172 if (page)
173 return page;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700174 }
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200175
Philipp Reisnerb411b362009-09-25 16:07:19 -0700176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 * which in turn might block on the other node at this very place. */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200179 for (i = 0; i < number; i++) {
180 tmp = alloc_page(GFP_TRY);
181 if (!tmp)
182 break;
183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
185 }
186
187 if (i == number)
188 return page;
189
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_pp_alloc will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
199 }
200 return NULL;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700201}
202
203/* kick lower level device, if we have more than (arbitrary number)
204 * reference counts on it, which typically are locally submitted io
205 * requests. don't use unacked_cnt, so we speed up proto A and B, too. */
206static void maybe_kick_lo(struct drbd_conf *mdev)
207{
208 if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
209 drbd_kick_lo(mdev);
210}
211
212static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
213{
214 struct drbd_epoch_entry *e;
215 struct list_head *le, *tle;
216
217 /* The EEs are always appended to the end of the list. Since
218 they are sent in order over the wire, they have to finish
219 in order. As soon as we see the first not finished we can
220 stop to examine the list... */
221
222 list_for_each_safe(le, tle, &mdev->net_ee) {
223 e = list_entry(le, struct drbd_epoch_entry, w.list);
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200224 if (drbd_ee_has_active_page(e))
Philipp Reisnerb411b362009-09-25 16:07:19 -0700225 break;
226 list_move(le, to_be_freed);
227 }
228}
229
230static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
231{
232 LIST_HEAD(reclaimed);
233 struct drbd_epoch_entry *e, *t;
234
235 maybe_kick_lo(mdev);
236 spin_lock_irq(&mdev->req_lock);
237 reclaim_net_ee(mdev, &reclaimed);
238 spin_unlock_irq(&mdev->req_lock);
239
240 list_for_each_entry_safe(e, t, &reclaimed, w.list)
241 drbd_free_ee(mdev, e);
242}
243
244/**
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200245 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700246 * @mdev: DRBD device.
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200247 * @number: number of pages requested
248 * @retry: whether to retry, if not enough pages are available right now
Philipp Reisnerb411b362009-09-25 16:07:19 -0700249 *
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200250 * Tries to allocate number pages, first from our own page pool, then from
251 * the kernel, unless this allocation would exceed the max_buffers setting.
252 * Possibly retry until DRBD frees sufficient pages somewhere else.
253 *
254 * Returns a page chain linked via page->private.
Philipp Reisnerb411b362009-09-25 16:07:19 -0700255 */
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200256static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
Philipp Reisnerb411b362009-09-25 16:07:19 -0700257{
258 struct page *page = NULL;
259 DEFINE_WAIT(wait);
260
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200261 /* Yes, we may run up to @number over max_buffers. If we
262 * follow it strictly, the admin will get it wrong anyways. */
263 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
264 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700265
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200266 while (page == NULL) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700267 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
268
269 drbd_kick_lo_and_reclaim_net(mdev);
270
271 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200272 page = drbd_pp_first_pages_or_try_alloc(mdev, number);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700273 if (page)
274 break;
275 }
276
277 if (!retry)
278 break;
279
280 if (signal_pending(current)) {
281 dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
282 break;
283 }
284
285 schedule();
286 }
287 finish_wait(&drbd_pp_wait, &wait);
288
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200289 if (page)
290 atomic_add(number, &mdev->pp_in_use);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700291 return page;
292}
293
294/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200295 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
296 * Either links the page chain back to the global pool,
297 * or returns all pages to the system. */
Philipp Reisnerb411b362009-09-25 16:07:19 -0700298static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
299{
Philipp Reisnerb411b362009-09-25 16:07:19 -0700300 int i;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200301 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
302 i = page_chain_free(page);
303 else {
304 struct page *tmp;
305 tmp = page_chain_tail(page, &i);
306 spin_lock(&drbd_pp_lock);
307 page_chain_add(&drbd_pp_pool, page, tmp);
308 drbd_pp_vacant += i;
309 spin_unlock(&drbd_pp_lock);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700310 }
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200311 atomic_sub(i, &mdev->pp_in_use);
312 i = atomic_read(&mdev->pp_in_use);
313 if (i < 0)
314 dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700315 wake_up(&drbd_pp_wait);
316}
317
318/*
319You need to hold the req_lock:
320 _drbd_wait_ee_list_empty()
321
322You must not have the req_lock:
323 drbd_free_ee()
324 drbd_alloc_ee()
325 drbd_init_ee()
326 drbd_release_ee()
327 drbd_ee_fix_bhs()
328 drbd_process_done_ee()
329 drbd_clear_done_ee()
330 drbd_wait_ee_list_empty()
331*/
332
333struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
334 u64 id,
335 sector_t sector,
336 unsigned int data_size,
337 gfp_t gfp_mask) __must_hold(local)
338{
Philipp Reisnerb411b362009-09-25 16:07:19 -0700339 struct drbd_epoch_entry *e;
340 struct page *page;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200341 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700342
343 if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
344 return NULL;
345
346 e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
347 if (!e) {
348 if (!(gfp_mask & __GFP_NOWARN))
349 dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
350 return NULL;
351 }
352
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200353 page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
354 if (!page)
355 goto fail;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700356
Philipp Reisnerb411b362009-09-25 16:07:19 -0700357 INIT_HLIST_NODE(&e->colision);
358 e->epoch = NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200359 e->mdev = mdev;
360 e->pages = page;
361 atomic_set(&e->pending_bios, 0);
362 e->size = data_size;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700363 e->flags = 0;
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200364 e->sector = sector;
365 e->sector = sector;
366 e->block_id = id;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700367
Philipp Reisnerb411b362009-09-25 16:07:19 -0700368 return e;
369
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200370 fail:
Philipp Reisnerb411b362009-09-25 16:07:19 -0700371 mempool_free(e, drbd_ee_mempool);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700372 return NULL;
373}
374
375void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
376{
Lars Ellenberg45bb9122010-05-14 17:10:48 +0200377 drbd_pp_free(mdev, e->pages);
378 D_ASSERT(atomic_read(&e->pending_bios) == 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700379 D_ASSERT(hlist_unhashed(&e->colision));
380 mempool_free(e, drbd_ee_mempool);
381}
382
383int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
384{
385 LIST_HEAD(work_list);
386 struct drbd_epoch_entry *e, *t;
387 int count = 0;
388
389 spin_lock_irq(&mdev->req_lock);
390 list_splice_init(list, &work_list);
391 spin_unlock_irq(&mdev->req_lock);
392
393 list_for_each_entry_safe(e, t, &work_list, w.list) {
394 drbd_free_ee(mdev, e);
395 count++;
396 }
397 return count;
398}
399
400
401/*
402 * This function is called from _asender only_
403 * but see also comments in _req_mod(,barrier_acked)
404 * and receive_Barrier.
405 *
406 * Move entries from net_ee to done_ee, if ready.
407 * Grab done_ee, call all callbacks, free the entries.
408 * The callbacks typically send out ACKs.
409 */
410static int drbd_process_done_ee(struct drbd_conf *mdev)
411{
412 LIST_HEAD(work_list);
413 LIST_HEAD(reclaimed);
414 struct drbd_epoch_entry *e, *t;
415 int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
416
417 spin_lock_irq(&mdev->req_lock);
418 reclaim_net_ee(mdev, &reclaimed);
419 list_splice_init(&mdev->done_ee, &work_list);
420 spin_unlock_irq(&mdev->req_lock);
421
422 list_for_each_entry_safe(e, t, &reclaimed, w.list)
423 drbd_free_ee(mdev, e);
424
425 /* possible callbacks here:
426 * e_end_block, and e_end_resync_block, e_send_discard_ack.
427 * all ignore the last argument.
428 */
429 list_for_each_entry_safe(e, t, &work_list, w.list) {
Philipp Reisnerb411b362009-09-25 16:07:19 -0700430 /* list_del not necessary, next/prev members not touched */
431 ok = e->w.cb(mdev, &e->w, !ok) && ok;
432 drbd_free_ee(mdev, e);
433 }
434 wake_up(&mdev->ee_wait);
435
436 return ok;
437}
438
439void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
440{
441 DEFINE_WAIT(wait);
442
443 /* avoids spin_lock/unlock
444 * and calling prepare_to_wait in the fast path */
445 while (!list_empty(head)) {
446 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
447 spin_unlock_irq(&mdev->req_lock);
448 drbd_kick_lo(mdev);
449 schedule();
450 finish_wait(&mdev->ee_wait, &wait);
451 spin_lock_irq(&mdev->req_lock);
452 }
453}
454
455void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
456{
457 spin_lock_irq(&mdev->req_lock);
458 _drbd_wait_ee_list_empty(mdev, head);
459 spin_unlock_irq(&mdev->req_lock);
460}
461
462/* see also kernel_accept; which is only present since 2.6.18.
463 * also we want to log which part of it failed, exactly */
464static int drbd_accept(struct drbd_conf *mdev, const char **what,
465 struct socket *sock, struct socket **newsock)
466{
467 struct sock *sk = sock->sk;
468 int err = 0;
469
470 *what = "listen";
471 err = sock->ops->listen(sock, 5);
472 if (err < 0)
473 goto out;
474
475 *what = "sock_create_lite";
476 err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
477 newsock);
478 if (err < 0)
479 goto out;
480
481 *what = "accept";
482 err = sock->ops->accept(sock, *newsock, 0);
483 if (err < 0) {
484 sock_release(*newsock);
485 *newsock = NULL;
486 goto out;
487 }
488 (*newsock)->ops = sock->ops;
489
490out:
491 return err;
492}
493
494static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
495 void *buf, size_t size, int flags)
496{
497 mm_segment_t oldfs;
498 struct kvec iov = {
499 .iov_base = buf,
500 .iov_len = size,
501 };
502 struct msghdr msg = {
503 .msg_iovlen = 1,
504 .msg_iov = (struct iovec *)&iov,
505 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506 };
507 int rv;
508
509 oldfs = get_fs();
510 set_fs(KERNEL_DS);
511 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512 set_fs(oldfs);
513
514 return rv;
515}
516
517static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
518{
519 mm_segment_t oldfs;
520 struct kvec iov = {
521 .iov_base = buf,
522 .iov_len = size,
523 };
524 struct msghdr msg = {
525 .msg_iovlen = 1,
526 .msg_iov = (struct iovec *)&iov,
527 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528 };
529 int rv;
530
531 oldfs = get_fs();
532 set_fs(KERNEL_DS);
533
534 for (;;) {
535 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
536 if (rv == size)
537 break;
538
539 /* Note:
540 * ECONNRESET other side closed the connection
541 * ERESTARTSYS (on sock) we got a signal
542 */
543
544 if (rv < 0) {
545 if (rv == -ECONNRESET)
546 dev_info(DEV, "sock was reset by peer\n");
547 else if (rv != -ERESTARTSYS)
548 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
549 break;
550 } else if (rv == 0) {
551 dev_info(DEV, "sock was shut down by peer\n");
552 break;
553 } else {
554 /* signal came in, or peer/link went down,
555 * after we read a partial message
556 */
557 /* D_ASSERT(signal_pending(current)); */
558 break;
559 }
560 };
561
562 set_fs(oldfs);
563
564 if (rv != size)
565 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
566
567 return rv;
568}
569
570static struct socket *drbd_try_connect(struct drbd_conf *mdev)
571{
572 const char *what;
573 struct socket *sock;
574 struct sockaddr_in6 src_in6;
575 int err;
576 int disconnect_on_error = 1;
577
578 if (!get_net_conf(mdev))
579 return NULL;
580
581 what = "sock_create_kern";
582 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
583 SOCK_STREAM, IPPROTO_TCP, &sock);
584 if (err < 0) {
585 sock = NULL;
586 goto out;
587 }
588
589 sock->sk->sk_rcvtimeo =
590 sock->sk->sk_sndtimeo = mdev->net_conf->try_connect_int*HZ;
591
592 /* explicitly bind to the configured IP as source IP
593 * for the outgoing connections.
594 * This is needed for multihomed hosts and to be
595 * able to use lo: interfaces for drbd.
596 * Make sure to use 0 as port number, so linux selects
597 * a free one dynamically.
598 */
599 memcpy(&src_in6, mdev->net_conf->my_addr,
600 min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
601 if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
602 src_in6.sin6_port = 0;
603 else
604 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
605
606 what = "bind before connect";
607 err = sock->ops->bind(sock,
608 (struct sockaddr *) &src_in6,
609 mdev->net_conf->my_addr_len);
610 if (err < 0)
611 goto out;
612
613 /* connect may fail, peer not yet available.
614 * stay C_WF_CONNECTION, don't go Disconnecting! */
615 disconnect_on_error = 0;
616 what = "connect";
617 err = sock->ops->connect(sock,
618 (struct sockaddr *)mdev->net_conf->peer_addr,
619 mdev->net_conf->peer_addr_len, 0);
620
621out:
622 if (err < 0) {
623 if (sock) {
624 sock_release(sock);
625 sock = NULL;
626 }
627 switch (-err) {
628 /* timeout, busy, signal pending */
629 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
630 case EINTR: case ERESTARTSYS:
631 /* peer not (yet) available, network problem */
632 case ECONNREFUSED: case ENETUNREACH:
633 case EHOSTDOWN: case EHOSTUNREACH:
634 disconnect_on_error = 0;
635 break;
636 default:
637 dev_err(DEV, "%s failed, err = %d\n", what, err);
638 }
639 if (disconnect_on_error)
640 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
641 }
642 put_net_conf(mdev);
643 return sock;
644}
645
646static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
647{
648 int timeo, err;
649 struct socket *s_estab = NULL, *s_listen;
650 const char *what;
651
652 if (!get_net_conf(mdev))
653 return NULL;
654
655 what = "sock_create_kern";
656 err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
657 SOCK_STREAM, IPPROTO_TCP, &s_listen);
658 if (err) {
659 s_listen = NULL;
660 goto out;
661 }
662
663 timeo = mdev->net_conf->try_connect_int * HZ;
664 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
665
666 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
667 s_listen->sk->sk_rcvtimeo = timeo;
668 s_listen->sk->sk_sndtimeo = timeo;
669
670 what = "bind before listen";
671 err = s_listen->ops->bind(s_listen,
672 (struct sockaddr *) mdev->net_conf->my_addr,
673 mdev->net_conf->my_addr_len);
674 if (err < 0)
675 goto out;
676
677 err = drbd_accept(mdev, &what, s_listen, &s_estab);
678
679out:
680 if (s_listen)
681 sock_release(s_listen);
682 if (err < 0) {
683 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
684 dev_err(DEV, "%s failed, err = %d\n", what, err);
685 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
686 }
687 }
688 put_net_conf(mdev);
689
690 return s_estab;
691}
692
693static int drbd_send_fp(struct drbd_conf *mdev,
694 struct socket *sock, enum drbd_packets cmd)
695{
696 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
697
698 return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
699}
700
701static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
702{
703 struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
704 int rr;
705
706 rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
707
708 if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
709 return be16_to_cpu(h->command);
710
711 return 0xffff;
712}
713
714/**
715 * drbd_socket_okay() - Free the socket if its connection is not okay
716 * @mdev: DRBD device.
717 * @sock: pointer to the pointer to the socket.
718 */
719static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
720{
721 int rr;
722 char tb[4];
723
724 if (!*sock)
725 return FALSE;
726
727 rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
728
729 if (rr > 0 || rr == -EAGAIN) {
730 return TRUE;
731 } else {
732 sock_release(*sock);
733 *sock = NULL;
734 return FALSE;
735 }
736}
737
738/*
739 * return values:
740 * 1 yes, we have a valid connection
741 * 0 oops, did not work out, please try again
742 * -1 peer talks different language,
743 * no point in trying again, please go standalone.
744 * -2 We do not have a network config...
745 */
746static int drbd_connect(struct drbd_conf *mdev)
747{
748 struct socket *s, *sock, *msock;
749 int try, h, ok;
750
751 D_ASSERT(!mdev->data.socket);
752
753 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
754 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
755
756 if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757 return -2;
758
759 clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760
761 sock = NULL;
762 msock = NULL;
763
764 do {
765 for (try = 0;;) {
766 /* 3 tries, this should take less than a second! */
767 s = drbd_try_connect(mdev);
768 if (s || ++try >= 3)
769 break;
770 /* give the other side time to call bind() & listen() */
771 __set_current_state(TASK_INTERRUPTIBLE);
772 schedule_timeout(HZ / 10);
773 }
774
775 if (s) {
776 if (!sock) {
777 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
778 sock = s;
779 s = NULL;
780 } else if (!msock) {
781 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
782 msock = s;
783 s = NULL;
784 } else {
785 dev_err(DEV, "Logic error in drbd_connect()\n");
786 goto out_release_sockets;
787 }
788 }
789
790 if (sock && msock) {
791 __set_current_state(TASK_INTERRUPTIBLE);
792 schedule_timeout(HZ / 10);
793 ok = drbd_socket_okay(mdev, &sock);
794 ok = drbd_socket_okay(mdev, &msock) && ok;
795 if (ok)
796 break;
797 }
798
799retry:
800 s = drbd_wait_for_connect(mdev);
801 if (s) {
802 try = drbd_recv_fp(mdev, s);
803 drbd_socket_okay(mdev, &sock);
804 drbd_socket_okay(mdev, &msock);
805 switch (try) {
806 case P_HAND_SHAKE_S:
807 if (sock) {
808 dev_warn(DEV, "initial packet S crossed\n");
809 sock_release(sock);
810 }
811 sock = s;
812 break;
813 case P_HAND_SHAKE_M:
814 if (msock) {
815 dev_warn(DEV, "initial packet M crossed\n");
816 sock_release(msock);
817 }
818 msock = s;
819 set_bit(DISCARD_CONCURRENT, &mdev->flags);
820 break;
821 default:
822 dev_warn(DEV, "Error receiving initial packet\n");
823 sock_release(s);
824 if (random32() & 1)
825 goto retry;
826 }
827 }
828
829 if (mdev->state.conn <= C_DISCONNECTING)
830 goto out_release_sockets;
831 if (signal_pending(current)) {
832 flush_signals(current);
833 smp_rmb();
834 if (get_t_state(&mdev->receiver) == Exiting)
835 goto out_release_sockets;
836 }
837
838 if (sock && msock) {
839 ok = drbd_socket_okay(mdev, &sock);
840 ok = drbd_socket_okay(mdev, &msock) && ok;
841 if (ok)
842 break;
843 }
844 } while (1);
845
846 msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
847 sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
848
849 sock->sk->sk_allocation = GFP_NOIO;
850 msock->sk->sk_allocation = GFP_NOIO;
851
852 sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
853 msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
854
855 if (mdev->net_conf->sndbuf_size) {
856 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
857 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
858 }
859
860 if (mdev->net_conf->rcvbuf_size) {
861 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
862 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
863 }
864
865 /* NOT YET ...
866 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
867 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
868 * first set it to the P_HAND_SHAKE timeout,
869 * which we set to 4x the configured ping_timeout. */
870 sock->sk->sk_sndtimeo =
871 sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
872
873 msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
874 msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
875
876 /* we don't want delays.
877 * we use TCP_CORK where apropriate, though */
878 drbd_tcp_nodelay(sock);
879 drbd_tcp_nodelay(msock);
880
881 mdev->data.socket = sock;
882 mdev->meta.socket = msock;
883 mdev->last_received = jiffies;
884
885 D_ASSERT(mdev->asender.task == NULL);
886
887 h = drbd_do_handshake(mdev);
888 if (h <= 0)
889 return h;
890
891 if (mdev->cram_hmac_tfm) {
892 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
Johannes Thomab10d96c2010-01-07 16:02:50 +0100893 switch (drbd_do_auth(mdev)) {
894 case -1:
Philipp Reisnerb411b362009-09-25 16:07:19 -0700895 dev_err(DEV, "Authentication of peer failed\n");
896 return -1;
Johannes Thomab10d96c2010-01-07 16:02:50 +0100897 case 0:
898 dev_err(DEV, "Authentication of peer failed, trying again.\n");
899 return 0;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700900 }
901 }
902
903 if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
904 return 0;
905
906 sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
907 sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
908
909 atomic_set(&mdev->packet_seq, 0);
910 mdev->peer_seq = 0;
911
912 drbd_thread_start(&mdev->asender);
913
Philipp Reisner7e2455c2010-04-22 14:50:23 +0200914 if (!drbd_send_protocol(mdev))
915 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -0700916 drbd_send_sync_param(mdev, &mdev->sync_conf);
Philipp Reisnere89b5912010-03-24 17:11:33 +0100917 drbd_send_sizes(mdev, 0, 0);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700918 drbd_send_uuids(mdev);
919 drbd_send_state(mdev);
920 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
921 clear_bit(RESIZE_PENDING, &mdev->flags);
922
923 return 1;
924
925out_release_sockets:
926 if (sock)
927 sock_release(sock);
928 if (msock)
929 sock_release(msock);
930 return -1;
931}
932
933static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
934{
935 int r;
936
937 r = drbd_recv(mdev, h, sizeof(*h));
938
939 if (unlikely(r != sizeof(*h))) {
940 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
941 return FALSE;
942 };
943 h->command = be16_to_cpu(h->command);
944 h->length = be16_to_cpu(h->length);
945 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
946 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
947 (long)be32_to_cpu(h->magic),
948 h->command, h->length);
949 return FALSE;
950 }
951 mdev->last_received = jiffies;
952
953 return TRUE;
954}
955
956static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
957{
958 int rv;
959
960 if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
Dmitry Monakhovfbd9b092010-04-28 17:55:06 +0400961 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
962 NULL, BLKDEV_IFL_WAIT);
Philipp Reisnerb411b362009-09-25 16:07:19 -0700963 if (rv) {
964 dev_err(DEV, "local disk flush failed with status %d\n", rv);
965 /* would rather check on EOPNOTSUPP, but that is not reliable.
966 * don't try again for ANY return value != 0
967 * if (rv == -EOPNOTSUPP) */
968 drbd_bump_write_ordering(mdev, WO_drain_io);
969 }
970 put_ldev(mdev);
971 }
972
973 return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
974}
975
976static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
977{
978 struct flush_work *fw = (struct flush_work *)w;
979 struct drbd_epoch *epoch = fw->epoch;
980
981 kfree(w);
982
983 if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
984 drbd_flush_after_epoch(mdev, epoch);
985
986 drbd_may_finish_epoch(mdev, epoch, EV_PUT |
987 (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
988
989 return 1;
990}
991
992/**
993 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
994 * @mdev: DRBD device.
995 * @epoch: Epoch object.
996 * @ev: Epoch event.
997 */
998static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
999 struct drbd_epoch *epoch,
1000 enum epoch_event ev)
1001{
1002 int finish, epoch_size;
1003 struct drbd_epoch *next_epoch;
1004 int schedule_flush = 0;
1005 enum finish_epoch rv = FE_STILL_LIVE;
1006
1007 spin_lock(&mdev->epoch_lock);
1008 do {
1009 next_epoch = NULL;
1010 finish = 0;
1011
1012 epoch_size = atomic_read(&epoch->epoch_size);
1013
1014 switch (ev & ~EV_CLEANUP) {
1015 case EV_PUT:
1016 atomic_dec(&epoch->active);
1017 break;
1018 case EV_GOT_BARRIER_NR:
1019 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1020
1021 /* Special case: If we just switched from WO_bio_barrier to
1022 WO_bdev_flush we should not finish the current epoch */
1023 if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1024 mdev->write_ordering != WO_bio_barrier &&
1025 epoch == mdev->current_epoch)
1026 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1027 break;
1028 case EV_BARRIER_DONE:
1029 set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1030 break;
1031 case EV_BECAME_LAST:
1032 /* nothing to do*/
1033 break;
1034 }
1035
Philipp Reisnerb411b362009-09-25 16:07:19 -07001036 if (epoch_size != 0 &&
1037 atomic_read(&epoch->active) == 0 &&
1038 test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1039 epoch->list.prev == &mdev->current_epoch->list &&
1040 !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1041 /* Nearly all conditions are met to finish that epoch... */
1042 if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1043 mdev->write_ordering == WO_none ||
1044 (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1045 ev & EV_CLEANUP) {
1046 finish = 1;
1047 set_bit(DE_IS_FINISHING, &epoch->flags);
1048 } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1049 mdev->write_ordering == WO_bio_barrier) {
1050 atomic_inc(&epoch->active);
1051 schedule_flush = 1;
1052 }
1053 }
1054 if (finish) {
1055 if (!(ev & EV_CLEANUP)) {
1056 spin_unlock(&mdev->epoch_lock);
1057 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1058 spin_lock(&mdev->epoch_lock);
1059 }
1060 dec_unacked(mdev);
1061
1062 if (mdev->current_epoch != epoch) {
1063 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1064 list_del(&epoch->list);
1065 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1066 mdev->epochs--;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001067 kfree(epoch);
1068
1069 if (rv == FE_STILL_LIVE)
1070 rv = FE_DESTROYED;
1071 } else {
1072 epoch->flags = 0;
1073 atomic_set(&epoch->epoch_size, 0);
1074 /* atomic_set(&epoch->active, 0); is alrady zero */
1075 if (rv == FE_STILL_LIVE)
1076 rv = FE_RECYCLED;
1077 }
1078 }
1079
1080 if (!next_epoch)
1081 break;
1082
1083 epoch = next_epoch;
1084 } while (1);
1085
1086 spin_unlock(&mdev->epoch_lock);
1087
1088 if (schedule_flush) {
1089 struct flush_work *fw;
1090 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1091 if (fw) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001092 fw->w.cb = w_flush;
1093 fw->epoch = epoch;
1094 drbd_queue_work(&mdev->data.work, &fw->w);
1095 } else {
1096 dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1097 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1098 /* That is not a recursion, only one level */
1099 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1100 drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1101 }
1102 }
1103
1104 return rv;
1105}
1106
1107/**
1108 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1109 * @mdev: DRBD device.
1110 * @wo: Write ordering method to try.
1111 */
1112void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1113{
1114 enum write_ordering_e pwo;
1115 static char *write_ordering_str[] = {
1116 [WO_none] = "none",
1117 [WO_drain_io] = "drain",
1118 [WO_bdev_flush] = "flush",
1119 [WO_bio_barrier] = "barrier",
1120 };
1121
1122 pwo = mdev->write_ordering;
1123 wo = min(pwo, wo);
1124 if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1125 wo = WO_bdev_flush;
1126 if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1127 wo = WO_drain_io;
1128 if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1129 wo = WO_none;
1130 mdev->write_ordering = wo;
1131 if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1132 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1133}
1134
1135/**
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001136 * drbd_submit_ee()
1137 * @mdev: DRBD device.
1138 * @e: epoch entry
1139 * @rw: flag field, see bio->bi_rw
1140 */
1141/* TODO allocate from our own bio_set. */
1142int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1143 const unsigned rw, const int fault_type)
1144{
1145 struct bio *bios = NULL;
1146 struct bio *bio;
1147 struct page *page = e->pages;
1148 sector_t sector = e->sector;
1149 unsigned ds = e->size;
1150 unsigned n_bios = 0;
1151 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1152
1153 /* In most cases, we will only need one bio. But in case the lower
1154 * level restrictions happen to be different at this offset on this
1155 * side than those of the sending peer, we may need to submit the
1156 * request in more than one bio. */
1157next_bio:
1158 bio = bio_alloc(GFP_NOIO, nr_pages);
1159 if (!bio) {
1160 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1161 goto fail;
1162 }
1163 /* > e->sector, unless this is the first bio */
1164 bio->bi_sector = sector;
1165 bio->bi_bdev = mdev->ldev->backing_bdev;
1166 /* we special case some flags in the multi-bio case, see below
1167 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1168 bio->bi_rw = rw;
1169 bio->bi_private = e;
1170 bio->bi_end_io = drbd_endio_sec;
1171
1172 bio->bi_next = bios;
1173 bios = bio;
1174 ++n_bios;
1175
1176 page_chain_for_each(page) {
1177 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1178 if (!bio_add_page(bio, page, len, 0)) {
1179 /* a single page must always be possible! */
1180 BUG_ON(bio->bi_vcnt == 0);
1181 goto next_bio;
1182 }
1183 ds -= len;
1184 sector += len >> 9;
1185 --nr_pages;
1186 }
1187 D_ASSERT(page == NULL);
1188 D_ASSERT(ds == 0);
1189
1190 atomic_set(&e->pending_bios, n_bios);
1191 do {
1192 bio = bios;
1193 bios = bios->bi_next;
1194 bio->bi_next = NULL;
1195
1196 /* strip off BIO_RW_UNPLUG unless it is the last bio */
1197 if (bios)
1198 bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1199
1200 drbd_generic_make_request(mdev, fault_type, bio);
1201
1202 /* strip off BIO_RW_BARRIER,
1203 * unless it is the first or last bio */
1204 if (bios && bios->bi_next)
1205 bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1206 } while (bios);
1207 maybe_kick_lo(mdev);
1208 return 0;
1209
1210fail:
1211 while (bios) {
1212 bio = bios;
1213 bios = bios->bi_next;
1214 bio_put(bio);
1215 }
1216 return -ENOMEM;
1217}
1218
1219/**
Philipp Reisnerb411b362009-09-25 16:07:19 -07001220 * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1221 * @mdev: DRBD device.
1222 * @w: work object.
1223 * @cancel: The connection will be closed anyways (unused in this callback)
1224 */
1225int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1226{
1227 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001228 /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1229 (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1230 so that we can finish that epoch in drbd_may_finish_epoch().
1231 That is necessary if we already have a long chain of Epochs, before
1232 we realize that BIO_RW_BARRIER is actually not supported */
1233
1234 /* As long as the -ENOTSUPP on the barrier is reported immediately
1235 that will never trigger. If it is reported late, we will just
1236 print that warning and continue correctly for all future requests
1237 with WO_bdev_flush */
1238 if (previous_epoch(mdev, e->epoch))
1239 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1240
Philipp Reisnerb411b362009-09-25 16:07:19 -07001241 /* we still have a local reference,
1242 * get_ldev was done in receive_Data. */
Philipp Reisnerb411b362009-09-25 16:07:19 -07001243
1244 e->w.cb = e_end_block;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001245 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1246 /* drbd_submit_ee fails for one reason only:
1247 * if was not able to allocate sufficient bios.
1248 * requeue, try again later. */
1249 e->w.cb = w_e_reissue;
1250 drbd_queue_work(&mdev->data.work, &e->w);
1251 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001252 return 1;
1253}
1254
1255static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1256{
1257 int rv, issue_flush;
1258 struct p_barrier *p = (struct p_barrier *)h;
1259 struct drbd_epoch *epoch;
1260
1261 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1262
1263 rv = drbd_recv(mdev, h->payload, h->length);
1264 ERR_IF(rv != h->length) return FALSE;
1265
1266 inc_unacked(mdev);
1267
1268 if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1269 drbd_kick_lo(mdev);
1270
1271 mdev->current_epoch->barrier_nr = p->barrier;
1272 rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1273
1274 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1275 * the activity log, which means it would not be resynced in case the
1276 * R_PRIMARY crashes now.
1277 * Therefore we must send the barrier_ack after the barrier request was
1278 * completed. */
1279 switch (mdev->write_ordering) {
1280 case WO_bio_barrier:
1281 case WO_none:
1282 if (rv == FE_RECYCLED)
1283 return TRUE;
1284 break;
1285
1286 case WO_bdev_flush:
1287 case WO_drain_io:
Philipp Reisner367a8d72009-12-29 15:56:01 +01001288 if (rv == FE_STILL_LIVE) {
1289 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1290 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1291 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1292 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001293 if (rv == FE_RECYCLED)
1294 return TRUE;
1295
1296 /* The asender will send all the ACKs and barrier ACKs out, since
1297 all EEs moved from the active_ee to the done_ee. We need to
1298 provide a new epoch object for the EEs that come in soon */
1299 break;
1300 }
1301
1302 /* receiver context, in the writeout path of the other node.
1303 * avoid potential distributed deadlock */
1304 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1305 if (!epoch) {
1306 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
Dan Carpenterd3db7b42010-01-23 15:45:22 +03001307 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001308 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1309 if (issue_flush) {
1310 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1311 if (rv == FE_RECYCLED)
1312 return TRUE;
1313 }
1314
1315 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1316
1317 return TRUE;
1318 }
1319
1320 epoch->flags = 0;
1321 atomic_set(&epoch->epoch_size, 0);
1322 atomic_set(&epoch->active, 0);
1323
1324 spin_lock(&mdev->epoch_lock);
1325 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1326 list_add(&epoch->list, &mdev->current_epoch->list);
1327 mdev->current_epoch = epoch;
1328 mdev->epochs++;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001329 } else {
1330 /* The current_epoch got recycled while we allocated this one... */
1331 kfree(epoch);
1332 }
1333 spin_unlock(&mdev->epoch_lock);
1334
1335 return TRUE;
1336}
1337
1338/* used from receive_RSDataReply (recv_resync_read)
1339 * and from receive_Data */
1340static struct drbd_epoch_entry *
1341read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1342{
Lars Ellenberg66660322010-04-06 12:15:04 +02001343 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001344 struct drbd_epoch_entry *e;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001345 struct page *page;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001346 int dgs, ds, rr;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001347 void *dig_in = mdev->int_dig_in;
1348 void *dig_vv = mdev->int_dig_vv;
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001349 unsigned long *data;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001350
1351 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1352 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1353
1354 if (dgs) {
1355 rr = drbd_recv(mdev, dig_in, dgs);
1356 if (rr != dgs) {
1357 dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1358 rr, dgs);
1359 return NULL;
1360 }
1361 }
1362
1363 data_size -= dgs;
1364
1365 ERR_IF(data_size & 0x1ff) return NULL;
1366 ERR_IF(data_size > DRBD_MAX_SEGMENT_SIZE) return NULL;
1367
Lars Ellenberg66660322010-04-06 12:15:04 +02001368 /* even though we trust out peer,
1369 * we sometimes have to double check. */
1370 if (sector + (data_size>>9) > capacity) {
1371 dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1372 (unsigned long long)capacity,
1373 (unsigned long long)sector, data_size);
1374 return NULL;
1375 }
1376
Philipp Reisnerb411b362009-09-25 16:07:19 -07001377 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1378 * "criss-cross" setup, that might cause write-out on some other DRBD,
1379 * which in turn might block on the other node at this very place. */
1380 e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1381 if (!e)
1382 return NULL;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001383
Philipp Reisnerb411b362009-09-25 16:07:19 -07001384 ds = data_size;
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001385 page = e->pages;
1386 page_chain_for_each(page) {
1387 unsigned len = min_t(int, ds, PAGE_SIZE);
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001388 data = kmap(page);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001389 rr = drbd_recv(mdev, data, len);
Philipp Reisner6b4388a2010-04-26 14:11:45 +02001390 if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1391 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1392 data[0] = data[0] ^ (unsigned long)-1;
1393 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07001394 kunmap(page);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001395 if (rr != len) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001396 drbd_free_ee(mdev, e);
1397 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001398 rr, len);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001399 return NULL;
1400 }
1401 ds -= rr;
1402 }
1403
1404 if (dgs) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001405 drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001406 if (memcmp(dig_in, dig_vv, dgs)) {
1407 dev_err(DEV, "Digest integrity check FAILED.\n");
1408 drbd_bcast_ee(mdev, "digest failed",
1409 dgs, dig_in, dig_vv, e);
1410 drbd_free_ee(mdev, e);
1411 return NULL;
1412 }
1413 }
1414 mdev->recv_cnt += data_size>>9;
1415 return e;
1416}
1417
1418/* drbd_drain_block() just takes a data block
1419 * out of the socket input buffer, and discards it.
1420 */
1421static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1422{
1423 struct page *page;
1424 int rr, rv = 1;
1425 void *data;
1426
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02001427 if (!data_size)
1428 return TRUE;
1429
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001430 page = drbd_pp_alloc(mdev, 1, 1);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001431
1432 data = kmap(page);
1433 while (data_size) {
1434 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1435 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1436 rv = 0;
1437 dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1438 rr, min_t(int, data_size, PAGE_SIZE));
1439 break;
1440 }
1441 data_size -= rr;
1442 }
1443 kunmap(page);
1444 drbd_pp_free(mdev, page);
1445 return rv;
1446}
1447
1448static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1449 sector_t sector, int data_size)
1450{
1451 struct bio_vec *bvec;
1452 struct bio *bio;
1453 int dgs, rr, i, expect;
1454 void *dig_in = mdev->int_dig_in;
1455 void *dig_vv = mdev->int_dig_vv;
1456
1457 dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1458 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1459
1460 if (dgs) {
1461 rr = drbd_recv(mdev, dig_in, dgs);
1462 if (rr != dgs) {
1463 dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1464 rr, dgs);
1465 return 0;
1466 }
1467 }
1468
1469 data_size -= dgs;
1470
1471 /* optimistically update recv_cnt. if receiving fails below,
1472 * we disconnect anyways, and counters will be reset. */
1473 mdev->recv_cnt += data_size>>9;
1474
1475 bio = req->master_bio;
1476 D_ASSERT(sector == bio->bi_sector);
1477
1478 bio_for_each_segment(bvec, bio, i) {
1479 expect = min_t(int, data_size, bvec->bv_len);
1480 rr = drbd_recv(mdev,
1481 kmap(bvec->bv_page)+bvec->bv_offset,
1482 expect);
1483 kunmap(bvec->bv_page);
1484 if (rr != expect) {
1485 dev_warn(DEV, "short read receiving data reply: "
1486 "read %d expected %d\n",
1487 rr, expect);
1488 return 0;
1489 }
1490 data_size -= rr;
1491 }
1492
1493 if (dgs) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001494 drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001495 if (memcmp(dig_in, dig_vv, dgs)) {
1496 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1497 return 0;
1498 }
1499 }
1500
1501 D_ASSERT(data_size == 0);
1502 return 1;
1503}
1504
1505/* e_end_resync_block() is called via
1506 * drbd_process_done_ee() by asender only */
1507static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1508{
1509 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1510 sector_t sector = e->sector;
1511 int ok;
1512
1513 D_ASSERT(hlist_unhashed(&e->colision));
1514
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001515 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001516 drbd_set_in_sync(mdev, sector, e->size);
1517 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1518 } else {
1519 /* Record failure to sync */
1520 drbd_rs_failed_io(mdev, sector, e->size);
1521
1522 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1523 }
1524 dec_unacked(mdev);
1525
1526 return ok;
1527}
1528
1529static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1530{
1531 struct drbd_epoch_entry *e;
1532
1533 e = read_in_block(mdev, ID_SYNCER, sector, data_size);
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001534 if (!e)
1535 goto fail;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001536
1537 dec_rs_pending(mdev);
1538
Philipp Reisnerb411b362009-09-25 16:07:19 -07001539 inc_unacked(mdev);
1540 /* corresponding dec_unacked() in e_end_resync_block()
1541 * respective _drbd_clear_done_ee */
1542
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001543 e->w.cb = e_end_resync_block;
1544
Philipp Reisnerb411b362009-09-25 16:07:19 -07001545 spin_lock_irq(&mdev->req_lock);
1546 list_add(&e->w.list, &mdev->sync_ee);
1547 spin_unlock_irq(&mdev->req_lock);
1548
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001549 if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1550 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001551
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001552 drbd_free_ee(mdev, e);
1553fail:
1554 put_ldev(mdev);
1555 return FALSE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001556}
1557
1558static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1559{
1560 struct drbd_request *req;
1561 sector_t sector;
1562 unsigned int header_size, data_size;
1563 int ok;
1564 struct p_data *p = (struct p_data *)h;
1565
1566 header_size = sizeof(*p) - sizeof(*h);
1567 data_size = h->length - header_size;
1568
1569 ERR_IF(data_size == 0) return FALSE;
1570
1571 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1572 return FALSE;
1573
1574 sector = be64_to_cpu(p->sector);
1575
1576 spin_lock_irq(&mdev->req_lock);
1577 req = _ar_id_to_req(mdev, p->block_id, sector);
1578 spin_unlock_irq(&mdev->req_lock);
1579 if (unlikely(!req)) {
1580 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1581 return FALSE;
1582 }
1583
1584 /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1585 * special casing it there for the various failure cases.
1586 * still no race with drbd_fail_pending_reads */
1587 ok = recv_dless_read(mdev, req, sector, data_size);
1588
1589 if (ok)
1590 req_mod(req, data_received);
1591 /* else: nothing. handled from drbd_disconnect...
1592 * I don't think we may complete this just yet
1593 * in case we are "on-disconnect: freeze" */
1594
1595 return ok;
1596}
1597
1598static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1599{
1600 sector_t sector;
1601 unsigned int header_size, data_size;
1602 int ok;
1603 struct p_data *p = (struct p_data *)h;
1604
1605 header_size = sizeof(*p) - sizeof(*h);
1606 data_size = h->length - header_size;
1607
1608 ERR_IF(data_size == 0) return FALSE;
1609
1610 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1611 return FALSE;
1612
1613 sector = be64_to_cpu(p->sector);
1614 D_ASSERT(p->block_id == ID_SYNCER);
1615
1616 if (get_ldev(mdev)) {
1617 /* data is submitted to disk within recv_resync_read.
1618 * corresponding put_ldev done below on error,
1619 * or in drbd_endio_write_sec. */
1620 ok = recv_resync_read(mdev, sector, data_size);
1621 } else {
1622 if (__ratelimit(&drbd_ratelimit_state))
1623 dev_err(DEV, "Can not write resync data to local disk.\n");
1624
1625 ok = drbd_drain_block(mdev, data_size);
1626
1627 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1628 }
1629
1630 return ok;
1631}
1632
1633/* e_end_block() is called via drbd_process_done_ee().
1634 * this means this function only runs in the asender thread
1635 */
1636static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1637{
1638 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1639 sector_t sector = e->sector;
1640 struct drbd_epoch *epoch;
1641 int ok = 1, pcmd;
1642
1643 if (e->flags & EE_IS_BARRIER) {
1644 epoch = previous_epoch(mdev, e->epoch);
1645 if (epoch)
1646 drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1647 }
1648
1649 if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001650 if (likely((e->flags & EE_WAS_ERROR) == 0)) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07001651 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1652 mdev->state.conn <= C_PAUSED_SYNC_T &&
1653 e->flags & EE_MAY_SET_IN_SYNC) ?
1654 P_RS_WRITE_ACK : P_WRITE_ACK;
1655 ok &= drbd_send_ack(mdev, pcmd, e);
1656 if (pcmd == P_RS_WRITE_ACK)
1657 drbd_set_in_sync(mdev, sector, e->size);
1658 } else {
1659 ok = drbd_send_ack(mdev, P_NEG_ACK, e);
1660 /* we expect it to be marked out of sync anyways...
1661 * maybe assert this? */
1662 }
1663 dec_unacked(mdev);
1664 }
1665 /* we delete from the conflict detection hash _after_ we sent out the
1666 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1667 if (mdev->net_conf->two_primaries) {
1668 spin_lock_irq(&mdev->req_lock);
1669 D_ASSERT(!hlist_unhashed(&e->colision));
1670 hlist_del_init(&e->colision);
1671 spin_unlock_irq(&mdev->req_lock);
1672 } else {
1673 D_ASSERT(hlist_unhashed(&e->colision));
1674 }
1675
1676 drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1677
1678 return ok;
1679}
1680
1681static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1682{
1683 struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1684 int ok = 1;
1685
1686 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1687 ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1688
1689 spin_lock_irq(&mdev->req_lock);
1690 D_ASSERT(!hlist_unhashed(&e->colision));
1691 hlist_del_init(&e->colision);
1692 spin_unlock_irq(&mdev->req_lock);
1693
1694 dec_unacked(mdev);
1695
1696 return ok;
1697}
1698
1699/* Called from receive_Data.
1700 * Synchronize packets on sock with packets on msock.
1701 *
1702 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1703 * packet traveling on msock, they are still processed in the order they have
1704 * been sent.
1705 *
1706 * Note: we don't care for Ack packets overtaking P_DATA packets.
1707 *
1708 * In case packet_seq is larger than mdev->peer_seq number, there are
1709 * outstanding packets on the msock. We wait for them to arrive.
1710 * In case we are the logically next packet, we update mdev->peer_seq
1711 * ourselves. Correctly handles 32bit wrap around.
1712 *
1713 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1714 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1715 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1716 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1717 *
1718 * returns 0 if we may process the packet,
1719 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1720static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1721{
1722 DEFINE_WAIT(wait);
1723 unsigned int p_seq;
1724 long timeout;
1725 int ret = 0;
1726 spin_lock(&mdev->peer_seq_lock);
1727 for (;;) {
1728 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1729 if (seq_le(packet_seq, mdev->peer_seq+1))
1730 break;
1731 if (signal_pending(current)) {
1732 ret = -ERESTARTSYS;
1733 break;
1734 }
1735 p_seq = mdev->peer_seq;
1736 spin_unlock(&mdev->peer_seq_lock);
1737 timeout = schedule_timeout(30*HZ);
1738 spin_lock(&mdev->peer_seq_lock);
1739 if (timeout == 0 && p_seq == mdev->peer_seq) {
1740 ret = -ETIMEDOUT;
1741 dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1742 break;
1743 }
1744 }
1745 finish_wait(&mdev->seq_wait, &wait);
1746 if (mdev->peer_seq+1 == packet_seq)
1747 mdev->peer_seq++;
1748 spin_unlock(&mdev->peer_seq_lock);
1749 return ret;
1750}
1751
1752/* mirrored write */
1753static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1754{
1755 sector_t sector;
1756 struct drbd_epoch_entry *e;
1757 struct p_data *p = (struct p_data *)h;
1758 int header_size, data_size;
1759 int rw = WRITE;
1760 u32 dp_flags;
1761
1762 header_size = sizeof(*p) - sizeof(*h);
1763 data_size = h->length - header_size;
1764
1765 ERR_IF(data_size == 0) return FALSE;
1766
1767 if (drbd_recv(mdev, h->payload, header_size) != header_size)
1768 return FALSE;
1769
1770 if (!get_ldev(mdev)) {
1771 if (__ratelimit(&drbd_ratelimit_state))
1772 dev_err(DEV, "Can not write mirrored data block "
1773 "to local disk.\n");
1774 spin_lock(&mdev->peer_seq_lock);
1775 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1776 mdev->peer_seq++;
1777 spin_unlock(&mdev->peer_seq_lock);
1778
1779 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1780 atomic_inc(&mdev->current_epoch->epoch_size);
1781 return drbd_drain_block(mdev, data_size);
1782 }
1783
1784 /* get_ldev(mdev) successful.
1785 * Corresponding put_ldev done either below (on various errors),
1786 * or in drbd_endio_write_sec, if we successfully submit the data at
1787 * the end of this function. */
1788
1789 sector = be64_to_cpu(p->sector);
1790 e = read_in_block(mdev, p->block_id, sector, data_size);
1791 if (!e) {
1792 put_ldev(mdev);
1793 return FALSE;
1794 }
1795
Philipp Reisnerb411b362009-09-25 16:07:19 -07001796 e->w.cb = e_end_block;
1797
1798 spin_lock(&mdev->epoch_lock);
1799 e->epoch = mdev->current_epoch;
1800 atomic_inc(&e->epoch->epoch_size);
1801 atomic_inc(&e->epoch->active);
1802
1803 if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1804 struct drbd_epoch *epoch;
1805 /* Issue a barrier if we start a new epoch, and the previous epoch
1806 was not a epoch containing a single request which already was
1807 a Barrier. */
1808 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1809 if (epoch == e->epoch) {
1810 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001811 rw |= (1<<BIO_RW_BARRIER);
1812 e->flags |= EE_IS_BARRIER;
1813 } else {
1814 if (atomic_read(&epoch->epoch_size) > 1 ||
1815 !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1816 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001817 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07001818 rw |= (1<<BIO_RW_BARRIER);
1819 e->flags |= EE_IS_BARRIER;
1820 }
1821 }
1822 }
1823 spin_unlock(&mdev->epoch_lock);
1824
1825 dp_flags = be32_to_cpu(p->dp_flags);
1826 if (dp_flags & DP_HARDBARRIER) {
1827 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1828 /* rw |= (1<<BIO_RW_BARRIER); */
1829 }
1830 if (dp_flags & DP_RW_SYNC)
1831 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1832 if (dp_flags & DP_MAY_SET_IN_SYNC)
1833 e->flags |= EE_MAY_SET_IN_SYNC;
1834
1835 /* I'm the receiver, I do hold a net_cnt reference. */
1836 if (!mdev->net_conf->two_primaries) {
1837 spin_lock_irq(&mdev->req_lock);
1838 } else {
1839 /* don't get the req_lock yet,
1840 * we may sleep in drbd_wait_peer_seq */
1841 const int size = e->size;
1842 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1843 DEFINE_WAIT(wait);
1844 struct drbd_request *i;
1845 struct hlist_node *n;
1846 struct hlist_head *slot;
1847 int first;
1848
1849 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1850 BUG_ON(mdev->ee_hash == NULL);
1851 BUG_ON(mdev->tl_hash == NULL);
1852
1853 /* conflict detection and handling:
1854 * 1. wait on the sequence number,
1855 * in case this data packet overtook ACK packets.
1856 * 2. check our hash tables for conflicting requests.
1857 * we only need to walk the tl_hash, since an ee can not
1858 * have a conflict with an other ee: on the submitting
1859 * node, the corresponding req had already been conflicting,
1860 * and a conflicting req is never sent.
1861 *
1862 * Note: for two_primaries, we are protocol C,
1863 * so there cannot be any request that is DONE
1864 * but still on the transfer log.
1865 *
1866 * unconditionally add to the ee_hash.
1867 *
1868 * if no conflicting request is found:
1869 * submit.
1870 *
1871 * if any conflicting request is found
1872 * that has not yet been acked,
1873 * AND I have the "discard concurrent writes" flag:
1874 * queue (via done_ee) the P_DISCARD_ACK; OUT.
1875 *
1876 * if any conflicting request is found:
1877 * block the receiver, waiting on misc_wait
1878 * until no more conflicting requests are there,
1879 * or we get interrupted (disconnect).
1880 *
1881 * we do not just write after local io completion of those
1882 * requests, but only after req is done completely, i.e.
1883 * we wait for the P_DISCARD_ACK to arrive!
1884 *
1885 * then proceed normally, i.e. submit.
1886 */
1887 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1888 goto out_interrupted;
1889
1890 spin_lock_irq(&mdev->req_lock);
1891
1892 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1893
1894#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1895 slot = tl_hash_slot(mdev, sector);
1896 first = 1;
1897 for (;;) {
1898 int have_unacked = 0;
1899 int have_conflict = 0;
1900 prepare_to_wait(&mdev->misc_wait, &wait,
1901 TASK_INTERRUPTIBLE);
1902 hlist_for_each_entry(i, n, slot, colision) {
1903 if (OVERLAPS) {
1904 /* only ALERT on first iteration,
1905 * we may be woken up early... */
1906 if (first)
1907 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1908 " new: %llus +%u; pending: %llus +%u\n",
1909 current->comm, current->pid,
1910 (unsigned long long)sector, size,
1911 (unsigned long long)i->sector, i->size);
1912 if (i->rq_state & RQ_NET_PENDING)
1913 ++have_unacked;
1914 ++have_conflict;
1915 }
1916 }
1917#undef OVERLAPS
1918 if (!have_conflict)
1919 break;
1920
1921 /* Discard Ack only for the _first_ iteration */
1922 if (first && discard && have_unacked) {
1923 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1924 (unsigned long long)sector);
1925 inc_unacked(mdev);
1926 e->w.cb = e_send_discard_ack;
1927 list_add_tail(&e->w.list, &mdev->done_ee);
1928
1929 spin_unlock_irq(&mdev->req_lock);
1930
1931 /* we could probably send that P_DISCARD_ACK ourselves,
1932 * but I don't like the receiver using the msock */
1933
1934 put_ldev(mdev);
1935 wake_asender(mdev);
1936 finish_wait(&mdev->misc_wait, &wait);
1937 return TRUE;
1938 }
1939
1940 if (signal_pending(current)) {
1941 hlist_del_init(&e->colision);
1942
1943 spin_unlock_irq(&mdev->req_lock);
1944
1945 finish_wait(&mdev->misc_wait, &wait);
1946 goto out_interrupted;
1947 }
1948
1949 spin_unlock_irq(&mdev->req_lock);
1950 if (first) {
1951 first = 0;
1952 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1953 "sec=%llus\n", (unsigned long long)sector);
1954 } else if (discard) {
1955 /* we had none on the first iteration.
1956 * there must be none now. */
1957 D_ASSERT(have_unacked == 0);
1958 }
1959 schedule();
1960 spin_lock_irq(&mdev->req_lock);
1961 }
1962 finish_wait(&mdev->misc_wait, &wait);
1963 }
1964
1965 list_add(&e->w.list, &mdev->active_ee);
1966 spin_unlock_irq(&mdev->req_lock);
1967
1968 switch (mdev->net_conf->wire_protocol) {
1969 case DRBD_PROT_C:
1970 inc_unacked(mdev);
1971 /* corresponding dec_unacked() in e_end_block()
1972 * respective _drbd_clear_done_ee */
1973 break;
1974 case DRBD_PROT_B:
1975 /* I really don't like it that the receiver thread
1976 * sends on the msock, but anyways */
1977 drbd_send_ack(mdev, P_RECV_ACK, e);
1978 break;
1979 case DRBD_PROT_A:
1980 /* nothing to do */
1981 break;
1982 }
1983
1984 if (mdev->state.pdsk == D_DISKLESS) {
1985 /* In case we have the only disk of the cluster, */
1986 drbd_set_out_of_sync(mdev, e->sector, e->size);
1987 e->flags |= EE_CALL_AL_COMPLETE_IO;
1988 drbd_al_begin_io(mdev, e->sector);
1989 }
1990
Lars Ellenberg45bb9122010-05-14 17:10:48 +02001991 if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1992 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07001993
1994out_interrupted:
1995 /* yes, the epoch_size now is imbalanced.
1996 * but we drop the connection anyways, so we don't have a chance to
1997 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1998 put_ldev(mdev);
1999 drbd_free_ee(mdev, e);
2000 return FALSE;
2001}
2002
2003static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2004{
2005 sector_t sector;
2006 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2007 struct drbd_epoch_entry *e;
2008 struct digest_info *di = NULL;
2009 int size, digest_size;
2010 unsigned int fault_type;
2011 struct p_block_req *p =
2012 (struct p_block_req *)h;
2013 const int brps = sizeof(*p)-sizeof(*h);
2014
2015 if (drbd_recv(mdev, h->payload, brps) != brps)
2016 return FALSE;
2017
2018 sector = be64_to_cpu(p->sector);
2019 size = be32_to_cpu(p->blksize);
2020
2021 if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2022 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2023 (unsigned long long)sector, size);
2024 return FALSE;
2025 }
2026 if (sector + (size>>9) > capacity) {
2027 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2028 (unsigned long long)sector, size);
2029 return FALSE;
2030 }
2031
2032 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2033 if (__ratelimit(&drbd_ratelimit_state))
2034 dev_err(DEV, "Can not satisfy peer's read request, "
2035 "no local data.\n");
2036 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2037 P_NEG_RS_DREPLY , p);
Lars Ellenbergc3470cd2010-04-01 16:57:19 +02002038 return drbd_drain_block(mdev, h->length - brps);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002039 }
2040
2041 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2042 * "criss-cross" setup, that might cause write-out on some other DRBD,
2043 * which in turn might block on the other node at this very place. */
2044 e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2045 if (!e) {
2046 put_ldev(mdev);
2047 return FALSE;
2048 }
2049
Philipp Reisnerb411b362009-09-25 16:07:19 -07002050 switch (h->command) {
2051 case P_DATA_REQUEST:
2052 e->w.cb = w_e_end_data_req;
2053 fault_type = DRBD_FAULT_DT_RD;
2054 break;
2055 case P_RS_DATA_REQUEST:
2056 e->w.cb = w_e_end_rsdata_req;
2057 fault_type = DRBD_FAULT_RS_RD;
2058 /* Eventually this should become asynchronously. Currently it
2059 * blocks the whole receiver just to delay the reading of a
2060 * resync data block.
2061 * the drbd_work_queue mechanism is made for this...
2062 */
2063 if (!drbd_rs_begin_io(mdev, sector)) {
2064 /* we have been interrupted,
2065 * probably connection lost! */
2066 D_ASSERT(signal_pending(current));
2067 goto out_free_e;
2068 }
2069 break;
2070
2071 case P_OV_REPLY:
2072 case P_CSUM_RS_REQUEST:
2073 fault_type = DRBD_FAULT_RS_RD;
2074 digest_size = h->length - brps ;
2075 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2076 if (!di)
2077 goto out_free_e;
2078
2079 di->digest_size = digest_size;
2080 di->digest = (((char *)di)+sizeof(struct digest_info));
2081
2082 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2083 goto out_free_e;
2084
2085 e->block_id = (u64)(unsigned long)di;
2086 if (h->command == P_CSUM_RS_REQUEST) {
2087 D_ASSERT(mdev->agreed_pro_version >= 89);
2088 e->w.cb = w_e_end_csum_rs_req;
2089 } else if (h->command == P_OV_REPLY) {
2090 e->w.cb = w_e_end_ov_reply;
2091 dec_rs_pending(mdev);
2092 break;
2093 }
2094
2095 if (!drbd_rs_begin_io(mdev, sector)) {
2096 /* we have been interrupted, probably connection lost! */
2097 D_ASSERT(signal_pending(current));
2098 goto out_free_e;
2099 }
2100 break;
2101
2102 case P_OV_REQUEST:
2103 if (mdev->state.conn >= C_CONNECTED &&
2104 mdev->state.conn != C_VERIFY_T)
2105 dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2106 drbd_conn_str(mdev->state.conn));
2107 if (mdev->ov_start_sector == ~(sector_t)0 &&
2108 mdev->agreed_pro_version >= 90) {
2109 mdev->ov_start_sector = sector;
2110 mdev->ov_position = sector;
2111 mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2112 dev_info(DEV, "Online Verify start sector: %llu\n",
2113 (unsigned long long)sector);
2114 }
2115 e->w.cb = w_e_end_ov_req;
2116 fault_type = DRBD_FAULT_RS_RD;
2117 /* Eventually this should become asynchronous. Currently it
2118 * blocks the whole receiver just to delay the reading of a
2119 * resync data block.
2120 * the drbd_work_queue mechanism is made for this...
2121 */
2122 if (!drbd_rs_begin_io(mdev, sector)) {
2123 /* we have been interrupted,
2124 * probably connection lost! */
2125 D_ASSERT(signal_pending(current));
2126 goto out_free_e;
2127 }
2128 break;
2129
2130
2131 default:
2132 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2133 cmdname(h->command));
2134 fault_type = DRBD_FAULT_MAX;
2135 }
2136
2137 spin_lock_irq(&mdev->req_lock);
2138 list_add(&e->w.list, &mdev->read_ee);
2139 spin_unlock_irq(&mdev->req_lock);
2140
2141 inc_unacked(mdev);
2142
Lars Ellenberg45bb9122010-05-14 17:10:48 +02002143 if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2144 return TRUE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002145
2146out_free_e:
2147 kfree(di);
2148 put_ldev(mdev);
2149 drbd_free_ee(mdev, e);
2150 return FALSE;
2151}
2152
2153static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2154{
2155 int self, peer, rv = -100;
2156 unsigned long ch_self, ch_peer;
2157
2158 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2159 peer = mdev->p_uuid[UI_BITMAP] & 1;
2160
2161 ch_peer = mdev->p_uuid[UI_SIZE];
2162 ch_self = mdev->comm_bm_set;
2163
2164 switch (mdev->net_conf->after_sb_0p) {
2165 case ASB_CONSENSUS:
2166 case ASB_DISCARD_SECONDARY:
2167 case ASB_CALL_HELPER:
2168 dev_err(DEV, "Configuration error.\n");
2169 break;
2170 case ASB_DISCONNECT:
2171 break;
2172 case ASB_DISCARD_YOUNGER_PRI:
2173 if (self == 0 && peer == 1) {
2174 rv = -1;
2175 break;
2176 }
2177 if (self == 1 && peer == 0) {
2178 rv = 1;
2179 break;
2180 }
2181 /* Else fall through to one of the other strategies... */
2182 case ASB_DISCARD_OLDER_PRI:
2183 if (self == 0 && peer == 1) {
2184 rv = 1;
2185 break;
2186 }
2187 if (self == 1 && peer == 0) {
2188 rv = -1;
2189 break;
2190 }
2191 /* Else fall through to one of the other strategies... */
Lars Ellenbergad19bf62009-10-14 09:36:49 +02002192 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
Philipp Reisnerb411b362009-09-25 16:07:19 -07002193 "Using discard-least-changes instead\n");
2194 case ASB_DISCARD_ZERO_CHG:
2195 if (ch_peer == 0 && ch_self == 0) {
2196 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2197 ? -1 : 1;
2198 break;
2199 } else {
2200 if (ch_peer == 0) { rv = 1; break; }
2201 if (ch_self == 0) { rv = -1; break; }
2202 }
2203 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2204 break;
2205 case ASB_DISCARD_LEAST_CHG:
2206 if (ch_self < ch_peer)
2207 rv = -1;
2208 else if (ch_self > ch_peer)
2209 rv = 1;
2210 else /* ( ch_self == ch_peer ) */
2211 /* Well, then use something else. */
2212 rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2213 ? -1 : 1;
2214 break;
2215 case ASB_DISCARD_LOCAL:
2216 rv = -1;
2217 break;
2218 case ASB_DISCARD_REMOTE:
2219 rv = 1;
2220 }
2221
2222 return rv;
2223}
2224
2225static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2226{
2227 int self, peer, hg, rv = -100;
2228
2229 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2230 peer = mdev->p_uuid[UI_BITMAP] & 1;
2231
2232 switch (mdev->net_conf->after_sb_1p) {
2233 case ASB_DISCARD_YOUNGER_PRI:
2234 case ASB_DISCARD_OLDER_PRI:
2235 case ASB_DISCARD_LEAST_CHG:
2236 case ASB_DISCARD_LOCAL:
2237 case ASB_DISCARD_REMOTE:
2238 dev_err(DEV, "Configuration error.\n");
2239 break;
2240 case ASB_DISCONNECT:
2241 break;
2242 case ASB_CONSENSUS:
2243 hg = drbd_asb_recover_0p(mdev);
2244 if (hg == -1 && mdev->state.role == R_SECONDARY)
2245 rv = hg;
2246 if (hg == 1 && mdev->state.role == R_PRIMARY)
2247 rv = hg;
2248 break;
2249 case ASB_VIOLENTLY:
2250 rv = drbd_asb_recover_0p(mdev);
2251 break;
2252 case ASB_DISCARD_SECONDARY:
2253 return mdev->state.role == R_PRIMARY ? 1 : -1;
2254 case ASB_CALL_HELPER:
2255 hg = drbd_asb_recover_0p(mdev);
2256 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2257 self = drbd_set_role(mdev, R_SECONDARY, 0);
2258 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2259 * we might be here in C_WF_REPORT_PARAMS which is transient.
2260 * we do not need to wait for the after state change work either. */
2261 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2262 if (self != SS_SUCCESS) {
2263 drbd_khelper(mdev, "pri-lost-after-sb");
2264 } else {
2265 dev_warn(DEV, "Successfully gave up primary role.\n");
2266 rv = hg;
2267 }
2268 } else
2269 rv = hg;
2270 }
2271
2272 return rv;
2273}
2274
2275static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2276{
2277 int self, peer, hg, rv = -100;
2278
2279 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2280 peer = mdev->p_uuid[UI_BITMAP] & 1;
2281
2282 switch (mdev->net_conf->after_sb_2p) {
2283 case ASB_DISCARD_YOUNGER_PRI:
2284 case ASB_DISCARD_OLDER_PRI:
2285 case ASB_DISCARD_LEAST_CHG:
2286 case ASB_DISCARD_LOCAL:
2287 case ASB_DISCARD_REMOTE:
2288 case ASB_CONSENSUS:
2289 case ASB_DISCARD_SECONDARY:
2290 dev_err(DEV, "Configuration error.\n");
2291 break;
2292 case ASB_VIOLENTLY:
2293 rv = drbd_asb_recover_0p(mdev);
2294 break;
2295 case ASB_DISCONNECT:
2296 break;
2297 case ASB_CALL_HELPER:
2298 hg = drbd_asb_recover_0p(mdev);
2299 if (hg == -1) {
2300 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2301 * we might be here in C_WF_REPORT_PARAMS which is transient.
2302 * we do not need to wait for the after state change work either. */
2303 self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2304 if (self != SS_SUCCESS) {
2305 drbd_khelper(mdev, "pri-lost-after-sb");
2306 } else {
2307 dev_warn(DEV, "Successfully gave up primary role.\n");
2308 rv = hg;
2309 }
2310 } else
2311 rv = hg;
2312 }
2313
2314 return rv;
2315}
2316
2317static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2318 u64 bits, u64 flags)
2319{
2320 if (!uuid) {
2321 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2322 return;
2323 }
2324 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2325 text,
2326 (unsigned long long)uuid[UI_CURRENT],
2327 (unsigned long long)uuid[UI_BITMAP],
2328 (unsigned long long)uuid[UI_HISTORY_START],
2329 (unsigned long long)uuid[UI_HISTORY_END],
2330 (unsigned long long)bits,
2331 (unsigned long long)flags);
2332}
2333
2334/*
2335 100 after split brain try auto recover
2336 2 C_SYNC_SOURCE set BitMap
2337 1 C_SYNC_SOURCE use BitMap
2338 0 no Sync
2339 -1 C_SYNC_TARGET use BitMap
2340 -2 C_SYNC_TARGET set BitMap
2341 -100 after split brain, disconnect
2342-1000 unrelated data
2343 */
2344static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2345{
2346 u64 self, peer;
2347 int i, j;
2348
2349 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2350 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2351
2352 *rule_nr = 10;
2353 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2354 return 0;
2355
2356 *rule_nr = 20;
2357 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2358 peer != UUID_JUST_CREATED)
2359 return -2;
2360
2361 *rule_nr = 30;
2362 if (self != UUID_JUST_CREATED &&
2363 (peer == UUID_JUST_CREATED || peer == (u64)0))
2364 return 2;
2365
2366 if (self == peer) {
2367 int rct, dc; /* roles at crash time */
2368
2369 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2370
2371 if (mdev->agreed_pro_version < 91)
2372 return -1001;
2373
2374 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2375 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2376 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2377 drbd_uuid_set_bm(mdev, 0UL);
2378
2379 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2380 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2381 *rule_nr = 34;
2382 } else {
2383 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2384 *rule_nr = 36;
2385 }
2386
2387 return 1;
2388 }
2389
2390 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2391
2392 if (mdev->agreed_pro_version < 91)
2393 return -1001;
2394
2395 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2396 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2397 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2398
2399 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2400 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2401 mdev->p_uuid[UI_BITMAP] = 0UL;
2402
2403 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2404 *rule_nr = 35;
2405 } else {
2406 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2407 *rule_nr = 37;
2408 }
2409
2410 return -1;
2411 }
2412
2413 /* Common power [off|failure] */
2414 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2415 (mdev->p_uuid[UI_FLAGS] & 2);
2416 /* lowest bit is set when we were primary,
2417 * next bit (weight 2) is set when peer was primary */
2418 *rule_nr = 40;
2419
2420 switch (rct) {
2421 case 0: /* !self_pri && !peer_pri */ return 0;
2422 case 1: /* self_pri && !peer_pri */ return 1;
2423 case 2: /* !self_pri && peer_pri */ return -1;
2424 case 3: /* self_pri && peer_pri */
2425 dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2426 return dc ? -1 : 1;
2427 }
2428 }
2429
2430 *rule_nr = 50;
2431 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2432 if (self == peer)
2433 return -1;
2434
2435 *rule_nr = 51;
2436 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2437 if (self == peer) {
2438 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2439 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2440 if (self == peer) {
2441 /* The last P_SYNC_UUID did not get though. Undo the last start of
2442 resync as sync source modifications of the peer's UUIDs. */
2443
2444 if (mdev->agreed_pro_version < 91)
2445 return -1001;
2446
2447 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2448 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2449 return -1;
2450 }
2451 }
2452
2453 *rule_nr = 60;
2454 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2455 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2456 peer = mdev->p_uuid[i] & ~((u64)1);
2457 if (self == peer)
2458 return -2;
2459 }
2460
2461 *rule_nr = 70;
2462 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2463 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2464 if (self == peer)
2465 return 1;
2466
2467 *rule_nr = 71;
2468 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2469 if (self == peer) {
2470 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2471 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2472 if (self == peer) {
2473 /* The last P_SYNC_UUID did not get though. Undo the last start of
2474 resync as sync source modifications of our UUIDs. */
2475
2476 if (mdev->agreed_pro_version < 91)
2477 return -1001;
2478
2479 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2480 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2481
2482 dev_info(DEV, "Undid last start of resync:\n");
2483
2484 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2485 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2486
2487 return 1;
2488 }
2489 }
2490
2491
2492 *rule_nr = 80;
Philipp Reisnerd8c2a362009-11-18 15:52:51 +01002493 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002494 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2495 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2496 if (self == peer)
2497 return 2;
2498 }
2499
2500 *rule_nr = 90;
2501 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2502 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2503 if (self == peer && self != ((u64)0))
2504 return 100;
2505
2506 *rule_nr = 100;
2507 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2508 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2509 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2510 peer = mdev->p_uuid[j] & ~((u64)1);
2511 if (self == peer)
2512 return -100;
2513 }
2514 }
2515
2516 return -1000;
2517}
2518
2519/* drbd_sync_handshake() returns the new conn state on success, or
2520 CONN_MASK (-1) on failure.
2521 */
2522static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2523 enum drbd_disk_state peer_disk) __must_hold(local)
2524{
2525 int hg, rule_nr;
2526 enum drbd_conns rv = C_MASK;
2527 enum drbd_disk_state mydisk;
2528
2529 mydisk = mdev->state.disk;
2530 if (mydisk == D_NEGOTIATING)
2531 mydisk = mdev->new_state_tmp.disk;
2532
2533 dev_info(DEV, "drbd_sync_handshake:\n");
2534 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2535 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2536 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2537
2538 hg = drbd_uuid_compare(mdev, &rule_nr);
2539
2540 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2541
2542 if (hg == -1000) {
2543 dev_alert(DEV, "Unrelated data, aborting!\n");
2544 return C_MASK;
2545 }
2546 if (hg == -1001) {
2547 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2548 return C_MASK;
2549 }
2550
2551 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2552 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2553 int f = (hg == -100) || abs(hg) == 2;
2554 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2555 if (f)
2556 hg = hg*2;
2557 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2558 hg > 0 ? "source" : "target");
2559 }
2560
Adam Gandelman3a11a482010-04-08 16:48:23 -07002561 if (abs(hg) == 100)
2562 drbd_khelper(mdev, "initial-split-brain");
2563
Philipp Reisnerb411b362009-09-25 16:07:19 -07002564 if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2565 int pcount = (mdev->state.role == R_PRIMARY)
2566 + (peer_role == R_PRIMARY);
2567 int forced = (hg == -100);
2568
2569 switch (pcount) {
2570 case 0:
2571 hg = drbd_asb_recover_0p(mdev);
2572 break;
2573 case 1:
2574 hg = drbd_asb_recover_1p(mdev);
2575 break;
2576 case 2:
2577 hg = drbd_asb_recover_2p(mdev);
2578 break;
2579 }
2580 if (abs(hg) < 100) {
2581 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2582 "automatically solved. Sync from %s node\n",
2583 pcount, (hg < 0) ? "peer" : "this");
2584 if (forced) {
2585 dev_warn(DEV, "Doing a full sync, since"
2586 " UUIDs where ambiguous.\n");
2587 hg = hg*2;
2588 }
2589 }
2590 }
2591
2592 if (hg == -100) {
2593 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2594 hg = -1;
2595 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2596 hg = 1;
2597
2598 if (abs(hg) < 100)
2599 dev_warn(DEV, "Split-Brain detected, manually solved. "
2600 "Sync from %s node\n",
2601 (hg < 0) ? "peer" : "this");
2602 }
2603
2604 if (hg == -100) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01002605 /* FIXME this log message is not correct if we end up here
2606 * after an attempted attach on a diskless node.
2607 * We just refuse to attach -- well, we drop the "connection"
2608 * to that disk, in a way... */
Adam Gandelman3a11a482010-04-08 16:48:23 -07002609 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
Philipp Reisnerb411b362009-09-25 16:07:19 -07002610 drbd_khelper(mdev, "split-brain");
2611 return C_MASK;
2612 }
2613
2614 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2615 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2616 return C_MASK;
2617 }
2618
2619 if (hg < 0 && /* by intention we do not use mydisk here. */
2620 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2621 switch (mdev->net_conf->rr_conflict) {
2622 case ASB_CALL_HELPER:
2623 drbd_khelper(mdev, "pri-lost");
2624 /* fall through */
2625 case ASB_DISCONNECT:
2626 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2627 return C_MASK;
2628 case ASB_VIOLENTLY:
2629 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2630 "assumption\n");
2631 }
2632 }
2633
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002634 if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2635 if (hg == 0)
2636 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2637 else
2638 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2639 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2640 abs(hg) >= 2 ? "full" : "bit-map based");
2641 return C_MASK;
2642 }
2643
Philipp Reisnerb411b362009-09-25 16:07:19 -07002644 if (abs(hg) >= 2) {
2645 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2646 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2647 return C_MASK;
2648 }
2649
2650 if (hg > 0) { /* become sync source. */
2651 rv = C_WF_BITMAP_S;
2652 } else if (hg < 0) { /* become sync target */
2653 rv = C_WF_BITMAP_T;
2654 } else {
2655 rv = C_CONNECTED;
2656 if (drbd_bm_total_weight(mdev)) {
2657 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2658 drbd_bm_total_weight(mdev));
2659 }
2660 }
2661
2662 return rv;
2663}
2664
2665/* returns 1 if invalid */
2666static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2667{
2668 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2669 if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2670 (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2671 return 0;
2672
2673 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2674 if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2675 self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2676 return 1;
2677
2678 /* everything else is valid if they are equal on both sides. */
2679 if (peer == self)
2680 return 0;
2681
2682 /* everything es is invalid. */
2683 return 1;
2684}
2685
2686static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2687{
2688 struct p_protocol *p = (struct p_protocol *)h;
2689 int header_size, data_size;
2690 int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002691 int p_want_lose, p_two_primaries, cf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002692 char p_integrity_alg[SHARED_SECRET_MAX] = "";
2693
2694 header_size = sizeof(*p) - sizeof(*h);
2695 data_size = h->length - header_size;
2696
2697 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2698 return FALSE;
2699
2700 p_proto = be32_to_cpu(p->protocol);
2701 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
2702 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
2703 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002704 p_two_primaries = be32_to_cpu(p->two_primaries);
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01002705 cf = be32_to_cpu(p->conn_flags);
2706 p_want_lose = cf & CF_WANT_LOSE;
2707
2708 clear_bit(CONN_DRY_RUN, &mdev->flags);
2709
2710 if (cf & CF_DRY_RUN)
2711 set_bit(CONN_DRY_RUN, &mdev->flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002712
2713 if (p_proto != mdev->net_conf->wire_protocol) {
2714 dev_err(DEV, "incompatible communication protocols\n");
2715 goto disconnect;
2716 }
2717
2718 if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2719 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2720 goto disconnect;
2721 }
2722
2723 if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2724 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2725 goto disconnect;
2726 }
2727
2728 if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2729 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2730 goto disconnect;
2731 }
2732
2733 if (p_want_lose && mdev->net_conf->want_lose) {
2734 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2735 goto disconnect;
2736 }
2737
2738 if (p_two_primaries != mdev->net_conf->two_primaries) {
2739 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2740 goto disconnect;
2741 }
2742
2743 if (mdev->agreed_pro_version >= 87) {
2744 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2745
2746 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2747 return FALSE;
2748
2749 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2750 if (strcmp(p_integrity_alg, my_alg)) {
2751 dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2752 goto disconnect;
2753 }
2754 dev_info(DEV, "data-integrity-alg: %s\n",
2755 my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2756 }
2757
2758 return TRUE;
2759
2760disconnect:
2761 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2762 return FALSE;
2763}
2764
2765/* helper function
2766 * input: alg name, feature name
2767 * return: NULL (alg name was "")
2768 * ERR_PTR(error) if something goes wrong
2769 * or the crypto hash ptr, if it worked out ok. */
2770struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2771 const char *alg, const char *name)
2772{
2773 struct crypto_hash *tfm;
2774
2775 if (!alg[0])
2776 return NULL;
2777
2778 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2779 if (IS_ERR(tfm)) {
2780 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2781 alg, name, PTR_ERR(tfm));
2782 return tfm;
2783 }
2784 if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2785 crypto_free_hash(tfm);
2786 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2787 return ERR_PTR(-EINVAL);
2788 }
2789 return tfm;
2790}
2791
2792static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2793{
2794 int ok = TRUE;
2795 struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2796 unsigned int header_size, data_size, exp_max_sz;
2797 struct crypto_hash *verify_tfm = NULL;
2798 struct crypto_hash *csums_tfm = NULL;
2799 const int apv = mdev->agreed_pro_version;
2800
2801 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
2802 : apv == 88 ? sizeof(struct p_rs_param)
2803 + SHARED_SECRET_MAX
2804 : /* 89 */ sizeof(struct p_rs_param_89);
2805
2806 if (h->length > exp_max_sz) {
2807 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2808 h->length, exp_max_sz);
2809 return FALSE;
2810 }
2811
2812 if (apv <= 88) {
2813 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2814 data_size = h->length - header_size;
2815 } else /* apv >= 89 */ {
2816 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2817 data_size = h->length - header_size;
2818 D_ASSERT(data_size == 0);
2819 }
2820
2821 /* initialize verify_alg and csums_alg */
2822 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2823
2824 if (drbd_recv(mdev, h->payload, header_size) != header_size)
2825 return FALSE;
2826
2827 mdev->sync_conf.rate = be32_to_cpu(p->rate);
2828
2829 if (apv >= 88) {
2830 if (apv == 88) {
2831 if (data_size > SHARED_SECRET_MAX) {
2832 dev_err(DEV, "verify-alg too long, "
2833 "peer wants %u, accepting only %u byte\n",
2834 data_size, SHARED_SECRET_MAX);
2835 return FALSE;
2836 }
2837
2838 if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2839 return FALSE;
2840
2841 /* we expect NUL terminated string */
2842 /* but just in case someone tries to be evil */
2843 D_ASSERT(p->verify_alg[data_size-1] == 0);
2844 p->verify_alg[data_size-1] = 0;
2845
2846 } else /* apv >= 89 */ {
2847 /* we still expect NUL terminated strings */
2848 /* but just in case someone tries to be evil */
2849 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2850 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2851 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2852 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2853 }
2854
2855 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2856 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2857 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2858 mdev->sync_conf.verify_alg, p->verify_alg);
2859 goto disconnect;
2860 }
2861 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2862 p->verify_alg, "verify-alg");
2863 if (IS_ERR(verify_tfm)) {
2864 verify_tfm = NULL;
2865 goto disconnect;
2866 }
2867 }
2868
2869 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2870 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2871 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2872 mdev->sync_conf.csums_alg, p->csums_alg);
2873 goto disconnect;
2874 }
2875 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2876 p->csums_alg, "csums-alg");
2877 if (IS_ERR(csums_tfm)) {
2878 csums_tfm = NULL;
2879 goto disconnect;
2880 }
2881 }
2882
2883
2884 spin_lock(&mdev->peer_seq_lock);
2885 /* lock against drbd_nl_syncer_conf() */
2886 if (verify_tfm) {
2887 strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2888 mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2889 crypto_free_hash(mdev->verify_tfm);
2890 mdev->verify_tfm = verify_tfm;
2891 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2892 }
2893 if (csums_tfm) {
2894 strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2895 mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2896 crypto_free_hash(mdev->csums_tfm);
2897 mdev->csums_tfm = csums_tfm;
2898 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2899 }
2900 spin_unlock(&mdev->peer_seq_lock);
2901 }
2902
2903 return ok;
2904disconnect:
2905 /* just for completeness: actually not needed,
2906 * as this is not reached if csums_tfm was ok. */
2907 crypto_free_hash(csums_tfm);
2908 /* but free the verify_tfm again, if csums_tfm did not work out */
2909 crypto_free_hash(verify_tfm);
2910 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2911 return FALSE;
2912}
2913
2914static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2915{
2916 /* sorry, we currently have no working implementation
2917 * of distributed TCQ */
2918}
2919
2920/* warn if the arguments differ by more than 12.5% */
2921static void warn_if_differ_considerably(struct drbd_conf *mdev,
2922 const char *s, sector_t a, sector_t b)
2923{
2924 sector_t d;
2925 if (a == 0 || b == 0)
2926 return;
2927 d = (a > b) ? (a - b) : (b - a);
2928 if (d > (a>>3) || d > (b>>3))
2929 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2930 (unsigned long long)a, (unsigned long long)b);
2931}
2932
2933static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2934{
2935 struct p_sizes *p = (struct p_sizes *)h;
2936 enum determine_dev_size dd = unchanged;
2937 unsigned int max_seg_s;
2938 sector_t p_size, p_usize, my_usize;
2939 int ldsc = 0; /* local disk size changed */
Philipp Reisnere89b5912010-03-24 17:11:33 +01002940 enum dds_flags ddsf;
Philipp Reisnerb411b362009-09-25 16:07:19 -07002941
2942 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2943 if (drbd_recv(mdev, h->payload, h->length) != h->length)
2944 return FALSE;
2945
2946 p_size = be64_to_cpu(p->d_size);
2947 p_usize = be64_to_cpu(p->u_size);
2948
2949 if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2950 dev_err(DEV, "some backing storage is needed\n");
2951 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2952 return FALSE;
2953 }
2954
2955 /* just store the peer's disk size for now.
2956 * we still need to figure out whether we accept that. */
2957 mdev->p_size = p_size;
2958
2959#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2960 if (get_ldev(mdev)) {
2961 warn_if_differ_considerably(mdev, "lower level device sizes",
2962 p_size, drbd_get_max_capacity(mdev->ldev));
2963 warn_if_differ_considerably(mdev, "user requested size",
2964 p_usize, mdev->ldev->dc.disk_size);
2965
2966 /* if this is the first connect, or an otherwise expected
2967 * param exchange, choose the minimum */
2968 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2969 p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2970 p_usize);
2971
2972 my_usize = mdev->ldev->dc.disk_size;
2973
2974 if (mdev->ldev->dc.disk_size != p_usize) {
2975 mdev->ldev->dc.disk_size = p_usize;
2976 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2977 (unsigned long)mdev->ldev->dc.disk_size);
2978 }
2979
2980 /* Never shrink a device with usable data during connect.
2981 But allow online shrinking if we are connected. */
Philipp Reisnera393db62009-12-22 13:35:52 +01002982 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
Philipp Reisnerb411b362009-09-25 16:07:19 -07002983 drbd_get_capacity(mdev->this_bdev) &&
2984 mdev->state.disk >= D_OUTDATED &&
2985 mdev->state.conn < C_CONNECTED) {
2986 dev_err(DEV, "The peer's disk size is too small!\n");
2987 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2988 mdev->ldev->dc.disk_size = my_usize;
2989 put_ldev(mdev);
2990 return FALSE;
2991 }
2992 put_ldev(mdev);
2993 }
2994#undef min_not_zero
2995
Philipp Reisnere89b5912010-03-24 17:11:33 +01002996 ddsf = be16_to_cpu(p->dds_flags);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002997 if (get_ldev(mdev)) {
Philipp Reisnere89b5912010-03-24 17:11:33 +01002998 dd = drbd_determin_dev_size(mdev, ddsf);
Philipp Reisnerb411b362009-09-25 16:07:19 -07002999 put_ldev(mdev);
3000 if (dd == dev_size_error)
3001 return FALSE;
3002 drbd_md_sync(mdev);
3003 } else {
3004 /* I am diskless, need to accept the peer's size. */
3005 drbd_set_my_capacity(mdev, p_size);
3006 }
3007
Philipp Reisnerb411b362009-09-25 16:07:19 -07003008 if (get_ldev(mdev)) {
3009 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3010 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3011 ldsc = 1;
3012 }
3013
Lars Ellenberga1c88d02010-05-14 19:16:41 +02003014 if (mdev->agreed_pro_version < 94)
3015 max_seg_s = be32_to_cpu(p->max_segment_size);
3016 else /* drbd 8.3.8 onwards */
3017 max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3018
Philipp Reisnerb411b362009-09-25 16:07:19 -07003019 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3020 drbd_setup_queue_param(mdev, max_seg_s);
3021
Philipp Reisnere89b5912010-03-24 17:11:33 +01003022 drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003023 put_ldev(mdev);
3024 }
3025
3026 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3027 if (be64_to_cpu(p->c_size) !=
3028 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3029 /* we have different sizes, probably peer
3030 * needs to know my new size... */
Philipp Reisnere89b5912010-03-24 17:11:33 +01003031 drbd_send_sizes(mdev, 0, ddsf);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003032 }
3033 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3034 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3035 if (mdev->state.pdsk >= D_INCONSISTENT &&
Philipp Reisnere89b5912010-03-24 17:11:33 +01003036 mdev->state.disk >= D_INCONSISTENT) {
3037 if (ddsf & DDSF_NO_RESYNC)
3038 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3039 else
3040 resync_after_online_grow(mdev);
3041 } else
Philipp Reisnerb411b362009-09-25 16:07:19 -07003042 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3043 }
3044 }
3045
3046 return TRUE;
3047}
3048
3049static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3050{
3051 struct p_uuids *p = (struct p_uuids *)h;
3052 u64 *p_uuid;
3053 int i;
3054
3055 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3056 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3057 return FALSE;
3058
3059 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3060
3061 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3062 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3063
3064 kfree(mdev->p_uuid);
3065 mdev->p_uuid = p_uuid;
3066
3067 if (mdev->state.conn < C_CONNECTED &&
3068 mdev->state.disk < D_INCONSISTENT &&
3069 mdev->state.role == R_PRIMARY &&
3070 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3071 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3072 (unsigned long long)mdev->ed_uuid);
3073 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3074 return FALSE;
3075 }
3076
3077 if (get_ldev(mdev)) {
3078 int skip_initial_sync =
3079 mdev->state.conn == C_CONNECTED &&
3080 mdev->agreed_pro_version >= 90 &&
3081 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3082 (p_uuid[UI_FLAGS] & 8);
3083 if (skip_initial_sync) {
3084 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3085 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3086 "clear_n_write from receive_uuids");
3087 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3088 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3089 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3090 CS_VERBOSE, NULL);
3091 drbd_md_sync(mdev);
3092 }
3093 put_ldev(mdev);
3094 }
3095
3096 /* Before we test for the disk state, we should wait until an eventually
3097 ongoing cluster wide state change is finished. That is important if
3098 we are primary and are detaching from our disk. We need to see the
3099 new disk state... */
3100 wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3101 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3102 drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3103
3104 return TRUE;
3105}
3106
3107/**
3108 * convert_state() - Converts the peer's view of the cluster state to our point of view
3109 * @ps: The state as seen by the peer.
3110 */
3111static union drbd_state convert_state(union drbd_state ps)
3112{
3113 union drbd_state ms;
3114
3115 static enum drbd_conns c_tab[] = {
3116 [C_CONNECTED] = C_CONNECTED,
3117
3118 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3119 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3120 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3121 [C_VERIFY_S] = C_VERIFY_T,
3122 [C_MASK] = C_MASK,
3123 };
3124
3125 ms.i = ps.i;
3126
3127 ms.conn = c_tab[ps.conn];
3128 ms.peer = ps.role;
3129 ms.role = ps.peer;
3130 ms.pdsk = ps.disk;
3131 ms.disk = ps.pdsk;
3132 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3133
3134 return ms;
3135}
3136
3137static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3138{
3139 struct p_req_state *p = (struct p_req_state *)h;
3140 union drbd_state mask, val;
3141 int rv;
3142
3143 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3144 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3145 return FALSE;
3146
3147 mask.i = be32_to_cpu(p->mask);
3148 val.i = be32_to_cpu(p->val);
3149
3150 if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3151 test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3152 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3153 return TRUE;
3154 }
3155
3156 mask = convert_state(mask);
3157 val = convert_state(val);
3158
3159 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3160
3161 drbd_send_sr_reply(mdev, rv);
3162 drbd_md_sync(mdev);
3163
3164 return TRUE;
3165}
3166
3167static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3168{
3169 struct p_state *p = (struct p_state *)h;
3170 enum drbd_conns nconn, oconn;
3171 union drbd_state ns, peer_state;
3172 enum drbd_disk_state real_peer_disk;
3173 int rv;
3174
3175 ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3176 return FALSE;
3177
3178 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3179 return FALSE;
3180
3181 peer_state.i = be32_to_cpu(p->state);
3182
3183 real_peer_disk = peer_state.disk;
3184 if (peer_state.disk == D_NEGOTIATING) {
3185 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3186 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3187 }
3188
3189 spin_lock_irq(&mdev->req_lock);
3190 retry:
3191 oconn = nconn = mdev->state.conn;
3192 spin_unlock_irq(&mdev->req_lock);
3193
3194 if (nconn == C_WF_REPORT_PARAMS)
3195 nconn = C_CONNECTED;
3196
3197 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3198 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3199 int cr; /* consider resync */
3200
3201 /* if we established a new connection */
3202 cr = (oconn < C_CONNECTED);
3203 /* if we had an established connection
3204 * and one of the nodes newly attaches a disk */
3205 cr |= (oconn == C_CONNECTED &&
3206 (peer_state.disk == D_NEGOTIATING ||
3207 mdev->state.disk == D_NEGOTIATING));
3208 /* if we have both been inconsistent, and the peer has been
3209 * forced to be UpToDate with --overwrite-data */
3210 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3211 /* if we had been plain connected, and the admin requested to
3212 * start a sync by "invalidate" or "invalidate-remote" */
3213 cr |= (oconn == C_CONNECTED &&
3214 (peer_state.conn >= C_STARTING_SYNC_S &&
3215 peer_state.conn <= C_WF_BITMAP_T));
3216
3217 if (cr)
3218 nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3219
3220 put_ldev(mdev);
3221 if (nconn == C_MASK) {
Lars Ellenberg580b9762010-02-26 23:15:23 +01003222 nconn = C_CONNECTED;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003223 if (mdev->state.disk == D_NEGOTIATING) {
3224 drbd_force_state(mdev, NS(disk, D_DISKLESS));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003225 } else if (peer_state.disk == D_NEGOTIATING) {
3226 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3227 peer_state.disk = D_DISKLESS;
Lars Ellenberg580b9762010-02-26 23:15:23 +01003228 real_peer_disk = D_DISKLESS;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003229 } else {
Philipp Reisnercf14c2e2010-02-02 21:03:50 +01003230 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3231 return FALSE;
Philipp Reisnerb411b362009-09-25 16:07:19 -07003232 D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3233 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3234 return FALSE;
3235 }
3236 }
3237 }
3238
3239 spin_lock_irq(&mdev->req_lock);
3240 if (mdev->state.conn != oconn)
3241 goto retry;
3242 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3243 ns.i = mdev->state.i;
3244 ns.conn = nconn;
3245 ns.peer = peer_state.role;
3246 ns.pdsk = real_peer_disk;
3247 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3248 if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3249 ns.disk = mdev->new_state_tmp.disk;
3250
3251 rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3252 ns = mdev->state;
3253 spin_unlock_irq(&mdev->req_lock);
3254
3255 if (rv < SS_SUCCESS) {
3256 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3257 return FALSE;
3258 }
3259
3260 if (oconn > C_WF_REPORT_PARAMS) {
3261 if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3262 peer_state.disk != D_NEGOTIATING ) {
3263 /* we want resync, peer has not yet decided to sync... */
3264 /* Nowadays only used when forcing a node into primary role and
3265 setting its disk to UpToDate with that */
3266 drbd_send_uuids(mdev);
3267 drbd_send_state(mdev);
3268 }
3269 }
3270
3271 mdev->net_conf->want_lose = 0;
3272
3273 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3274
3275 return TRUE;
3276}
3277
3278static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3279{
3280 struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3281
3282 wait_event(mdev->misc_wait,
3283 mdev->state.conn == C_WF_SYNC_UUID ||
3284 mdev->state.conn < C_CONNECTED ||
3285 mdev->state.disk < D_NEGOTIATING);
3286
3287 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3288
3289 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3290 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3291 return FALSE;
3292
3293 /* Here the _drbd_uuid_ functions are right, current should
3294 _not_ be rotated into the history */
3295 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3296 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3297 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3298
3299 drbd_start_resync(mdev, C_SYNC_TARGET);
3300
3301 put_ldev(mdev);
3302 } else
3303 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3304
3305 return TRUE;
3306}
3307
3308enum receive_bitmap_ret { OK, DONE, FAILED };
3309
3310static enum receive_bitmap_ret
3311receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3312 unsigned long *buffer, struct bm_xfer_ctx *c)
3313{
3314 unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3315 unsigned want = num_words * sizeof(long);
3316
3317 if (want != h->length) {
3318 dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3319 return FAILED;
3320 }
3321 if (want == 0)
3322 return DONE;
3323 if (drbd_recv(mdev, buffer, want) != want)
3324 return FAILED;
3325
3326 drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3327
3328 c->word_offset += num_words;
3329 c->bit_offset = c->word_offset * BITS_PER_LONG;
3330 if (c->bit_offset > c->bm_bits)
3331 c->bit_offset = c->bm_bits;
3332
3333 return OK;
3334}
3335
3336static enum receive_bitmap_ret
3337recv_bm_rle_bits(struct drbd_conf *mdev,
3338 struct p_compressed_bm *p,
3339 struct bm_xfer_ctx *c)
3340{
3341 struct bitstream bs;
3342 u64 look_ahead;
3343 u64 rl;
3344 u64 tmp;
3345 unsigned long s = c->bit_offset;
3346 unsigned long e;
3347 int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3348 int toggle = DCBP_get_start(p);
3349 int have;
3350 int bits;
3351
3352 bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3353
3354 bits = bitstream_get_bits(&bs, &look_ahead, 64);
3355 if (bits < 0)
3356 return FAILED;
3357
3358 for (have = bits; have > 0; s += rl, toggle = !toggle) {
3359 bits = vli_decode_bits(&rl, look_ahead);
3360 if (bits <= 0)
3361 return FAILED;
3362
3363 if (toggle) {
3364 e = s + rl -1;
3365 if (e >= c->bm_bits) {
3366 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3367 return FAILED;
3368 }
3369 _drbd_bm_set_bits(mdev, s, e);
3370 }
3371
3372 if (have < bits) {
3373 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3374 have, bits, look_ahead,
3375 (unsigned int)(bs.cur.b - p->code),
3376 (unsigned int)bs.buf_len);
3377 return FAILED;
3378 }
3379 look_ahead >>= bits;
3380 have -= bits;
3381
3382 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3383 if (bits < 0)
3384 return FAILED;
3385 look_ahead |= tmp << have;
3386 have += bits;
3387 }
3388
3389 c->bit_offset = s;
3390 bm_xfer_ctx_bit_to_word_offset(c);
3391
3392 return (s == c->bm_bits) ? DONE : OK;
3393}
3394
3395static enum receive_bitmap_ret
3396decode_bitmap_c(struct drbd_conf *mdev,
3397 struct p_compressed_bm *p,
3398 struct bm_xfer_ctx *c)
3399{
3400 if (DCBP_get_code(p) == RLE_VLI_Bits)
3401 return recv_bm_rle_bits(mdev, p, c);
3402
3403 /* other variants had been implemented for evaluation,
3404 * but have been dropped as this one turned out to be "best"
3405 * during all our tests. */
3406
3407 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3408 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3409 return FAILED;
3410}
3411
3412void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3413 const char *direction, struct bm_xfer_ctx *c)
3414{
3415 /* what would it take to transfer it "plaintext" */
3416 unsigned plain = sizeof(struct p_header) *
3417 ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3418 + c->bm_words * sizeof(long);
3419 unsigned total = c->bytes[0] + c->bytes[1];
3420 unsigned r;
3421
3422 /* total can not be zero. but just in case: */
3423 if (total == 0)
3424 return;
3425
3426 /* don't report if not compressed */
3427 if (total >= plain)
3428 return;
3429
3430 /* total < plain. check for overflow, still */
3431 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3432 : (1000 * total / plain);
3433
3434 if (r > 1000)
3435 r = 1000;
3436
3437 r = 1000 - r;
3438 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3439 "total %u; compression: %u.%u%%\n",
3440 direction,
3441 c->bytes[1], c->packets[1],
3442 c->bytes[0], c->packets[0],
3443 total, r/10, r % 10);
3444}
3445
3446/* Since we are processing the bitfield from lower addresses to higher,
3447 it does not matter if the process it in 32 bit chunks or 64 bit
3448 chunks as long as it is little endian. (Understand it as byte stream,
3449 beginning with the lowest byte...) If we would use big endian
3450 we would need to process it from the highest address to the lowest,
3451 in order to be agnostic to the 32 vs 64 bits issue.
3452
3453 returns 0 on failure, 1 if we successfully received it. */
3454static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3455{
3456 struct bm_xfer_ctx c;
3457 void *buffer;
3458 enum receive_bitmap_ret ret;
3459 int ok = FALSE;
3460
3461 wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3462
3463 drbd_bm_lock(mdev, "receive bitmap");
3464
3465 /* maybe we should use some per thread scratch page,
3466 * and allocate that during initial device creation? */
3467 buffer = (unsigned long *) __get_free_page(GFP_NOIO);
3468 if (!buffer) {
3469 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3470 goto out;
3471 }
3472
3473 c = (struct bm_xfer_ctx) {
3474 .bm_bits = drbd_bm_bits(mdev),
3475 .bm_words = drbd_bm_words(mdev),
3476 };
3477
3478 do {
3479 if (h->command == P_BITMAP) {
3480 ret = receive_bitmap_plain(mdev, h, buffer, &c);
3481 } else if (h->command == P_COMPRESSED_BITMAP) {
3482 /* MAYBE: sanity check that we speak proto >= 90,
3483 * and the feature is enabled! */
3484 struct p_compressed_bm *p;
3485
3486 if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3487 dev_err(DEV, "ReportCBitmap packet too large\n");
3488 goto out;
3489 }
3490 /* use the page buff */
3491 p = buffer;
3492 memcpy(p, h, sizeof(*h));
3493 if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3494 goto out;
3495 if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3496 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3497 return FAILED;
3498 }
3499 ret = decode_bitmap_c(mdev, p, &c);
3500 } else {
3501 dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3502 goto out;
3503 }
3504
3505 c.packets[h->command == P_BITMAP]++;
3506 c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3507
3508 if (ret != OK)
3509 break;
3510
3511 if (!drbd_recv_header(mdev, h))
3512 goto out;
3513 } while (ret == OK);
3514 if (ret == FAILED)
3515 goto out;
3516
3517 INFO_bm_xfer_stats(mdev, "receive", &c);
3518
3519 if (mdev->state.conn == C_WF_BITMAP_T) {
3520 ok = !drbd_send_bitmap(mdev);
3521 if (!ok)
3522 goto out;
3523 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3524 ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3525 D_ASSERT(ok == SS_SUCCESS);
3526 } else if (mdev->state.conn != C_WF_BITMAP_S) {
3527 /* admin may have requested C_DISCONNECTING,
3528 * other threads may have noticed network errors */
3529 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3530 drbd_conn_str(mdev->state.conn));
3531 }
3532
3533 ok = TRUE;
3534 out:
3535 drbd_bm_unlock(mdev);
3536 if (ok && mdev->state.conn == C_WF_BITMAP_S)
3537 drbd_start_resync(mdev, C_SYNC_SOURCE);
3538 free_page((unsigned long) buffer);
3539 return ok;
3540}
3541
3542static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3543{
3544 /* TODO zero copy sink :) */
3545 static char sink[128];
3546 int size, want, r;
3547
3548 dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3549 h->command, h->length);
3550
3551 size = h->length;
3552 while (size > 0) {
3553 want = min_t(int, size, sizeof(sink));
3554 r = drbd_recv(mdev, sink, want);
3555 ERR_IF(r <= 0) break;
3556 size -= r;
3557 }
3558 return size == 0;
3559}
3560
3561static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3562{
3563 if (mdev->state.disk >= D_INCONSISTENT)
3564 drbd_kick_lo(mdev);
3565
3566 /* Make sure we've acked all the TCP data associated
3567 * with the data requests being unplugged */
3568 drbd_tcp_quickack(mdev->data.socket);
3569
3570 return TRUE;
3571}
3572
Philipp Reisner0ced55a2010-04-30 15:26:20 +02003573static void timeval_sub_us(struct timeval* tv, unsigned int us)
3574{
3575 tv->tv_sec -= us / 1000000;
3576 us = us % 1000000;
3577 if (tv->tv_usec > us) {
3578 tv->tv_usec += 1000000;
3579 tv->tv_sec--;
3580 }
3581 tv->tv_usec -= us;
3582}
3583
3584static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3585{
3586 struct delay_probe *dp;
3587 struct list_head *le;
3588 struct timeval now;
3589 int seq_num;
3590 int offset;
3591 int data_delay;
3592
3593 seq_num = be32_to_cpu(p->seq_num);
3594 offset = be32_to_cpu(p->offset);
3595
3596 spin_lock(&mdev->peer_seq_lock);
3597 if (!list_empty(&mdev->delay_probes)) {
3598 if (from == USE_DATA_SOCKET)
3599 le = mdev->delay_probes.next;
3600 else
3601 le = mdev->delay_probes.prev;
3602
3603 dp = list_entry(le, struct delay_probe, list);
3604
3605 if (dp->seq_num == seq_num) {
3606 list_del(le);
3607 spin_unlock(&mdev->peer_seq_lock);
3608 do_gettimeofday(&now);
3609 timeval_sub_us(&now, offset);
3610 data_delay =
3611 now.tv_usec - dp->time.tv_usec +
3612 (now.tv_sec - dp->time.tv_sec) * 1000000;
3613
3614 if (data_delay > 0)
3615 mdev->data_delay = data_delay;
3616
3617 kfree(dp);
3618 return;
3619 }
3620
3621 if (dp->seq_num > seq_num) {
3622 spin_unlock(&mdev->peer_seq_lock);
3623 dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3624 return; /* Do not alloca a struct delay_probe.... */
3625 }
3626 }
3627 spin_unlock(&mdev->peer_seq_lock);
3628
3629 dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3630 if (!dp) {
3631 dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3632 return;
3633 }
3634
3635 dp->seq_num = seq_num;
3636 do_gettimeofday(&dp->time);
3637 timeval_sub_us(&dp->time, offset);
3638
3639 spin_lock(&mdev->peer_seq_lock);
3640 if (from == USE_DATA_SOCKET)
3641 list_add(&dp->list, &mdev->delay_probes);
3642 else
3643 list_add_tail(&dp->list, &mdev->delay_probes);
3644 spin_unlock(&mdev->peer_seq_lock);
3645}
3646
3647static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3648{
3649 struct p_delay_probe *p = (struct p_delay_probe *)h;
3650
3651 ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3652 if (drbd_recv(mdev, h->payload, h->length) != h->length)
3653 return FALSE;
3654
3655 got_delay_probe(mdev, USE_DATA_SOCKET, p);
3656 return TRUE;
3657}
3658
Philipp Reisnerb411b362009-09-25 16:07:19 -07003659typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3660
3661static drbd_cmd_handler_f drbd_default_handler[] = {
3662 [P_DATA] = receive_Data,
3663 [P_DATA_REPLY] = receive_DataReply,
3664 [P_RS_DATA_REPLY] = receive_RSDataReply,
3665 [P_BARRIER] = receive_Barrier,
3666 [P_BITMAP] = receive_bitmap,
3667 [P_COMPRESSED_BITMAP] = receive_bitmap,
3668 [P_UNPLUG_REMOTE] = receive_UnplugRemote,
3669 [P_DATA_REQUEST] = receive_DataRequest,
3670 [P_RS_DATA_REQUEST] = receive_DataRequest,
3671 [P_SYNC_PARAM] = receive_SyncParam,
3672 [P_SYNC_PARAM89] = receive_SyncParam,
3673 [P_PROTOCOL] = receive_protocol,
3674 [P_UUIDS] = receive_uuids,
3675 [P_SIZES] = receive_sizes,
3676 [P_STATE] = receive_state,
3677 [P_STATE_CHG_REQ] = receive_req_state,
3678 [P_SYNC_UUID] = receive_sync_uuid,
3679 [P_OV_REQUEST] = receive_DataRequest,
3680 [P_OV_REPLY] = receive_DataRequest,
3681 [P_CSUM_RS_REQUEST] = receive_DataRequest,
Philipp Reisner0ced55a2010-04-30 15:26:20 +02003682 [P_DELAY_PROBE] = receive_delay_probe,
Philipp Reisnerb411b362009-09-25 16:07:19 -07003683 /* anything missing from this table is in
3684 * the asender_tbl, see get_asender_cmd */
3685 [P_MAX_CMD] = NULL,
3686};
3687
3688static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3689static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3690
3691static void drbdd(struct drbd_conf *mdev)
3692{
3693 drbd_cmd_handler_f handler;
3694 struct p_header *header = &mdev->data.rbuf.header;
3695
3696 while (get_t_state(&mdev->receiver) == Running) {
3697 drbd_thread_current_set_cpu(mdev);
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003698 if (!drbd_recv_header(mdev, header)) {
3699 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
Philipp Reisnerb411b362009-09-25 16:07:19 -07003700 break;
Lars Ellenberg0b33a912009-11-16 15:58:04 +01003701 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003702
3703 if (header->command < P_MAX_CMD)
3704 handler = drbd_cmd_handler[header->command];
3705 else if (P_MAY_IGNORE < header->command
3706 && header->command < P_MAX_OPT_CMD)
3707 handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3708 else if (header->command > P_MAX_OPT_CMD)
3709 handler = receive_skip;
3710 else
3711 handler = NULL;
3712
3713 if (unlikely(!handler)) {
3714 dev_err(DEV, "unknown packet type %d, l: %d!\n",
3715 header->command, header->length);
3716 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3717 break;
3718 }
3719 if (unlikely(!handler(mdev, header))) {
3720 dev_err(DEV, "error receiving %s, l: %d!\n",
3721 cmdname(header->command), header->length);
3722 drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3723 break;
3724 }
Philipp Reisnerb411b362009-09-25 16:07:19 -07003725 }
3726}
3727
3728static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3729{
3730 struct hlist_head *slot;
3731 struct hlist_node *pos;
3732 struct hlist_node *tmp;
3733 struct drbd_request *req;
3734 int i;
3735
3736 /*
3737 * Application READ requests
3738 */
3739 spin_lock_irq(&mdev->req_lock);
3740 for (i = 0; i < APP_R_HSIZE; i++) {
3741 slot = mdev->app_reads_hash+i;
3742 hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3743 /* it may (but should not any longer!)
3744 * be on the work queue; if that assert triggers,
3745 * we need to also grab the
3746 * spin_lock_irq(&mdev->data.work.q_lock);
3747 * and list_del_init here. */
3748 D_ASSERT(list_empty(&req->w.list));
3749 /* It would be nice to complete outside of spinlock.
3750 * But this is easier for now. */
3751 _req_mod(req, connection_lost_while_pending);
3752 }
3753 }
3754 for (i = 0; i < APP_R_HSIZE; i++)
3755 if (!hlist_empty(mdev->app_reads_hash+i))
3756 dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3757 "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3758
3759 memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3760 spin_unlock_irq(&mdev->req_lock);
3761}
3762
3763void drbd_flush_workqueue(struct drbd_conf *mdev)
3764{
3765 struct drbd_wq_barrier barr;
3766
3767 barr.w.cb = w_prev_work_done;
3768 init_completion(&barr.done);
3769 drbd_queue_work(&mdev->data.work, &barr.w);
3770 wait_for_completion(&barr.done);
3771}
3772
3773static void drbd_disconnect(struct drbd_conf *mdev)
3774{
3775 enum drbd_fencing_p fp;
3776 union drbd_state os, ns;
3777 int rv = SS_UNKNOWN_ERROR;
3778 unsigned int i;
3779
3780 if (mdev->state.conn == C_STANDALONE)
3781 return;
3782 if (mdev->state.conn >= C_WF_CONNECTION)
3783 dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3784 drbd_conn_str(mdev->state.conn));
3785
3786 /* asender does not clean up anything. it must not interfere, either */
3787 drbd_thread_stop(&mdev->asender);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003788 drbd_free_sock(mdev);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003789
3790 spin_lock_irq(&mdev->req_lock);
3791 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3792 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3793 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3794 spin_unlock_irq(&mdev->req_lock);
3795
3796 /* We do not have data structures that would allow us to
3797 * get the rs_pending_cnt down to 0 again.
3798 * * On C_SYNC_TARGET we do not have any data structures describing
3799 * the pending RSDataRequest's we have sent.
3800 * * On C_SYNC_SOURCE there is no data structure that tracks
3801 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3802 * And no, it is not the sum of the reference counts in the
3803 * resync_LRU. The resync_LRU tracks the whole operation including
3804 * the disk-IO, while the rs_pending_cnt only tracks the blocks
3805 * on the fly. */
3806 drbd_rs_cancel_all(mdev);
3807 mdev->rs_total = 0;
3808 mdev->rs_failed = 0;
3809 atomic_set(&mdev->rs_pending_cnt, 0);
3810 wake_up(&mdev->misc_wait);
3811
3812 /* make sure syncer is stopped and w_resume_next_sg queued */
3813 del_timer_sync(&mdev->resync_timer);
3814 set_bit(STOP_SYNC_TIMER, &mdev->flags);
3815 resync_timer_fn((unsigned long)mdev);
3816
Philipp Reisnerb411b362009-09-25 16:07:19 -07003817 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3818 * w_make_resync_request etc. which may still be on the worker queue
3819 * to be "canceled" */
3820 drbd_flush_workqueue(mdev);
3821
3822 /* This also does reclaim_net_ee(). If we do this too early, we might
3823 * miss some resync ee and pages.*/
3824 drbd_process_done_ee(mdev);
3825
3826 kfree(mdev->p_uuid);
3827 mdev->p_uuid = NULL;
3828
3829 if (!mdev->state.susp)
3830 tl_clear(mdev);
3831
3832 drbd_fail_pending_reads(mdev);
3833
3834 dev_info(DEV, "Connection closed\n");
3835
3836 drbd_md_sync(mdev);
3837
3838 fp = FP_DONT_CARE;
3839 if (get_ldev(mdev)) {
3840 fp = mdev->ldev->dc.fencing;
3841 put_ldev(mdev);
3842 }
3843
3844 if (mdev->state.role == R_PRIMARY) {
3845 if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3846 enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3847 drbd_request_state(mdev, NS(pdsk, nps));
3848 }
3849 }
3850
3851 spin_lock_irq(&mdev->req_lock);
3852 os = mdev->state;
3853 if (os.conn >= C_UNCONNECTED) {
3854 /* Do not restart in case we are C_DISCONNECTING */
3855 ns = os;
3856 ns.conn = C_UNCONNECTED;
3857 rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3858 }
3859 spin_unlock_irq(&mdev->req_lock);
3860
3861 if (os.conn == C_DISCONNECTING) {
3862 struct hlist_head *h;
3863 wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3864
3865 /* we must not free the tl_hash
3866 * while application io is still on the fly */
3867 wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3868
3869 spin_lock_irq(&mdev->req_lock);
3870 /* paranoia code */
3871 for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3872 if (h->first)
3873 dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3874 (int)(h - mdev->ee_hash), h->first);
3875 kfree(mdev->ee_hash);
3876 mdev->ee_hash = NULL;
3877 mdev->ee_hash_s = 0;
3878
3879 /* paranoia code */
3880 for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3881 if (h->first)
3882 dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3883 (int)(h - mdev->tl_hash), h->first);
3884 kfree(mdev->tl_hash);
3885 mdev->tl_hash = NULL;
3886 mdev->tl_hash_s = 0;
3887 spin_unlock_irq(&mdev->req_lock);
3888
3889 crypto_free_hash(mdev->cram_hmac_tfm);
3890 mdev->cram_hmac_tfm = NULL;
3891
3892 kfree(mdev->net_conf);
3893 mdev->net_conf = NULL;
3894 drbd_request_state(mdev, NS(conn, C_STANDALONE));
3895 }
3896
3897 /* tcp_close and release of sendpage pages can be deferred. I don't
3898 * want to use SO_LINGER, because apparently it can be deferred for
3899 * more than 20 seconds (longest time I checked).
3900 *
3901 * Actually we don't care for exactly when the network stack does its
3902 * put_page(), but release our reference on these pages right here.
3903 */
3904 i = drbd_release_ee(mdev, &mdev->net_ee);
3905 if (i)
3906 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3907 i = atomic_read(&mdev->pp_in_use);
3908 if (i)
Lars Ellenberg45bb9122010-05-14 17:10:48 +02003909 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
Philipp Reisnerb411b362009-09-25 16:07:19 -07003910
3911 D_ASSERT(list_empty(&mdev->read_ee));
3912 D_ASSERT(list_empty(&mdev->active_ee));
3913 D_ASSERT(list_empty(&mdev->sync_ee));
3914 D_ASSERT(list_empty(&mdev->done_ee));
3915
3916 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3917 atomic_set(&mdev->current_epoch->epoch_size, 0);
3918 D_ASSERT(list_empty(&mdev->current_epoch->list));
3919}
3920
3921/*
3922 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3923 * we can agree on is stored in agreed_pro_version.
3924 *
3925 * feature flags and the reserved array should be enough room for future
3926 * enhancements of the handshake protocol, and possible plugins...
3927 *
3928 * for now, they are expected to be zero, but ignored.
3929 */
3930static int drbd_send_handshake(struct drbd_conf *mdev)
3931{
3932 /* ASSERT current == mdev->receiver ... */
3933 struct p_handshake *p = &mdev->data.sbuf.handshake;
3934 int ok;
3935
3936 if (mutex_lock_interruptible(&mdev->data.mutex)) {
3937 dev_err(DEV, "interrupted during initial handshake\n");
3938 return 0; /* interrupted. not ok. */
3939 }
3940
3941 if (mdev->data.socket == NULL) {
3942 mutex_unlock(&mdev->data.mutex);
3943 return 0;
3944 }
3945
3946 memset(p, 0, sizeof(*p));
3947 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3948 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3949 ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3950 (struct p_header *)p, sizeof(*p), 0 );
3951 mutex_unlock(&mdev->data.mutex);
3952 return ok;
3953}
3954
3955/*
3956 * return values:
3957 * 1 yes, we have a valid connection
3958 * 0 oops, did not work out, please try again
3959 * -1 peer talks different language,
3960 * no point in trying again, please go standalone.
3961 */
3962static int drbd_do_handshake(struct drbd_conf *mdev)
3963{
3964 /* ASSERT current == mdev->receiver ... */
3965 struct p_handshake *p = &mdev->data.rbuf.handshake;
3966 const int expect = sizeof(struct p_handshake)
3967 -sizeof(struct p_header);
3968 int rv;
3969
3970 rv = drbd_send_handshake(mdev);
3971 if (!rv)
3972 return 0;
3973
3974 rv = drbd_recv_header(mdev, &p->head);
3975 if (!rv)
3976 return 0;
3977
3978 if (p->head.command != P_HAND_SHAKE) {
3979 dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3980 cmdname(p->head.command), p->head.command);
3981 return -1;
3982 }
3983
3984 if (p->head.length != expect) {
3985 dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3986 expect, p->head.length);
3987 return -1;
3988 }
3989
3990 rv = drbd_recv(mdev, &p->head.payload, expect);
3991
3992 if (rv != expect) {
3993 dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3994 return 0;
3995 }
3996
Philipp Reisnerb411b362009-09-25 16:07:19 -07003997 p->protocol_min = be32_to_cpu(p->protocol_min);
3998 p->protocol_max = be32_to_cpu(p->protocol_max);
3999 if (p->protocol_max == 0)
4000 p->protocol_max = p->protocol_min;
4001
4002 if (PRO_VERSION_MAX < p->protocol_min ||
4003 PRO_VERSION_MIN > p->protocol_max)
4004 goto incompat;
4005
4006 mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4007
4008 dev_info(DEV, "Handshake successful: "
4009 "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4010
4011 return 1;
4012
4013 incompat:
4014 dev_err(DEV, "incompatible DRBD dialects: "
4015 "I support %d-%d, peer supports %d-%d\n",
4016 PRO_VERSION_MIN, PRO_VERSION_MAX,
4017 p->protocol_min, p->protocol_max);
4018 return -1;
4019}
4020
4021#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4022static int drbd_do_auth(struct drbd_conf *mdev)
4023{
4024 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4025 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004026 return -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004027}
4028#else
4029#define CHALLENGE_LEN 64
Johannes Thomab10d96c2010-01-07 16:02:50 +01004030
4031/* Return value:
4032 1 - auth succeeded,
4033 0 - failed, try again (network error),
4034 -1 - auth failed, don't try again.
4035*/
4036
Philipp Reisnerb411b362009-09-25 16:07:19 -07004037static int drbd_do_auth(struct drbd_conf *mdev)
4038{
4039 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4040 struct scatterlist sg;
4041 char *response = NULL;
4042 char *right_response = NULL;
4043 char *peers_ch = NULL;
4044 struct p_header p;
4045 unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4046 unsigned int resp_size;
4047 struct hash_desc desc;
4048 int rv;
4049
4050 desc.tfm = mdev->cram_hmac_tfm;
4051 desc.flags = 0;
4052
4053 rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4054 (u8 *)mdev->net_conf->shared_secret, key_len);
4055 if (rv) {
4056 dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004057 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004058 goto fail;
4059 }
4060
4061 get_random_bytes(my_challenge, CHALLENGE_LEN);
4062
4063 rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4064 if (!rv)
4065 goto fail;
4066
4067 rv = drbd_recv_header(mdev, &p);
4068 if (!rv)
4069 goto fail;
4070
4071 if (p.command != P_AUTH_CHALLENGE) {
4072 dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4073 cmdname(p.command), p.command);
4074 rv = 0;
4075 goto fail;
4076 }
4077
4078 if (p.length > CHALLENGE_LEN*2) {
4079 dev_err(DEV, "expected AuthChallenge payload too big.\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004080 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004081 goto fail;
4082 }
4083
4084 peers_ch = kmalloc(p.length, GFP_NOIO);
4085 if (peers_ch == NULL) {
4086 dev_err(DEV, "kmalloc of peers_ch failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004087 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004088 goto fail;
4089 }
4090
4091 rv = drbd_recv(mdev, peers_ch, p.length);
4092
4093 if (rv != p.length) {
4094 dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4095 rv = 0;
4096 goto fail;
4097 }
4098
4099 resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4100 response = kmalloc(resp_size, GFP_NOIO);
4101 if (response == NULL) {
4102 dev_err(DEV, "kmalloc of response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004103 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004104 goto fail;
4105 }
4106
4107 sg_init_table(&sg, 1);
4108 sg_set_buf(&sg, peers_ch, p.length);
4109
4110 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4111 if (rv) {
4112 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004113 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004114 goto fail;
4115 }
4116
4117 rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4118 if (!rv)
4119 goto fail;
4120
4121 rv = drbd_recv_header(mdev, &p);
4122 if (!rv)
4123 goto fail;
4124
4125 if (p.command != P_AUTH_RESPONSE) {
4126 dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4127 cmdname(p.command), p.command);
4128 rv = 0;
4129 goto fail;
4130 }
4131
4132 if (p.length != resp_size) {
4133 dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4134 rv = 0;
4135 goto fail;
4136 }
4137
4138 rv = drbd_recv(mdev, response , resp_size);
4139
4140 if (rv != resp_size) {
4141 dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4142 rv = 0;
4143 goto fail;
4144 }
4145
4146 right_response = kmalloc(resp_size, GFP_NOIO);
Julia Lawall2d1ee872009-12-27 22:27:11 +01004147 if (right_response == NULL) {
Philipp Reisnerb411b362009-09-25 16:07:19 -07004148 dev_err(DEV, "kmalloc of right_response failed\n");
Johannes Thomab10d96c2010-01-07 16:02:50 +01004149 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004150 goto fail;
4151 }
4152
4153 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4154
4155 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4156 if (rv) {
4157 dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004158 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004159 goto fail;
4160 }
4161
4162 rv = !memcmp(response, right_response, resp_size);
4163
4164 if (rv)
4165 dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4166 resp_size, mdev->net_conf->cram_hmac_alg);
Johannes Thomab10d96c2010-01-07 16:02:50 +01004167 else
4168 rv = -1;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004169
4170 fail:
4171 kfree(peers_ch);
4172 kfree(response);
4173 kfree(right_response);
4174
4175 return rv;
4176}
4177#endif
4178
4179int drbdd_init(struct drbd_thread *thi)
4180{
4181 struct drbd_conf *mdev = thi->mdev;
4182 unsigned int minor = mdev_to_minor(mdev);
4183 int h;
4184
4185 sprintf(current->comm, "drbd%d_receiver", minor);
4186
4187 dev_info(DEV, "receiver (re)started\n");
4188
4189 do {
4190 h = drbd_connect(mdev);
4191 if (h == 0) {
4192 drbd_disconnect(mdev);
4193 __set_current_state(TASK_INTERRUPTIBLE);
4194 schedule_timeout(HZ);
4195 }
4196 if (h == -1) {
4197 dev_warn(DEV, "Discarding network configuration.\n");
4198 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4199 }
4200 } while (h == 0);
4201
4202 if (h > 0) {
4203 if (get_net_conf(mdev)) {
4204 drbdd(mdev);
4205 put_net_conf(mdev);
4206 }
4207 }
4208
4209 drbd_disconnect(mdev);
4210
4211 dev_info(DEV, "receiver terminated\n");
4212 return 0;
4213}
4214
4215/* ********* acknowledge sender ******** */
4216
4217static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4218{
4219 struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4220
4221 int retcode = be32_to_cpu(p->retcode);
4222
4223 if (retcode >= SS_SUCCESS) {
4224 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4225 } else {
4226 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4227 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4228 drbd_set_st_err_str(retcode), retcode);
4229 }
4230 wake_up(&mdev->state_wait);
4231
4232 return TRUE;
4233}
4234
4235static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4236{
4237 return drbd_send_ping_ack(mdev);
4238
4239}
4240
4241static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4242{
4243 /* restore idle timeout */
4244 mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
Philipp Reisner309d1602010-03-02 15:03:44 +01004245 if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4246 wake_up(&mdev->misc_wait);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004247
4248 return TRUE;
4249}
4250
4251static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4252{
4253 struct p_block_ack *p = (struct p_block_ack *)h;
4254 sector_t sector = be64_to_cpu(p->sector);
4255 int blksize = be32_to_cpu(p->blksize);
4256
4257 D_ASSERT(mdev->agreed_pro_version >= 89);
4258
4259 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4260
4261 drbd_rs_complete_io(mdev, sector);
4262 drbd_set_in_sync(mdev, sector, blksize);
4263 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4264 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4265 dec_rs_pending(mdev);
4266
4267 return TRUE;
4268}
4269
4270/* when we receive the ACK for a write request,
4271 * verify that we actually know about it */
4272static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4273 u64 id, sector_t sector)
4274{
4275 struct hlist_head *slot = tl_hash_slot(mdev, sector);
4276 struct hlist_node *n;
4277 struct drbd_request *req;
4278
4279 hlist_for_each_entry(req, n, slot, colision) {
4280 if ((unsigned long)req == (unsigned long)id) {
4281 if (req->sector != sector) {
4282 dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4283 "wrong sector (%llus versus %llus)\n", req,
4284 (unsigned long long)req->sector,
4285 (unsigned long long)sector);
4286 break;
4287 }
4288 return req;
4289 }
4290 }
4291 dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4292 (void *)(unsigned long)id, (unsigned long long)sector);
4293 return NULL;
4294}
4295
4296typedef struct drbd_request *(req_validator_fn)
4297 (struct drbd_conf *mdev, u64 id, sector_t sector);
4298
4299static int validate_req_change_req_state(struct drbd_conf *mdev,
4300 u64 id, sector_t sector, req_validator_fn validator,
4301 const char *func, enum drbd_req_event what)
4302{
4303 struct drbd_request *req;
4304 struct bio_and_error m;
4305
4306 spin_lock_irq(&mdev->req_lock);
4307 req = validator(mdev, id, sector);
4308 if (unlikely(!req)) {
4309 spin_unlock_irq(&mdev->req_lock);
4310 dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4311 return FALSE;
4312 }
4313 __req_mod(req, what, &m);
4314 spin_unlock_irq(&mdev->req_lock);
4315
4316 if (m.bio)
4317 complete_master_bio(mdev, &m);
4318 return TRUE;
4319}
4320
4321static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4322{
4323 struct p_block_ack *p = (struct p_block_ack *)h;
4324 sector_t sector = be64_to_cpu(p->sector);
4325 int blksize = be32_to_cpu(p->blksize);
4326 enum drbd_req_event what;
4327
4328 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4329
4330 if (is_syncer_block_id(p->block_id)) {
4331 drbd_set_in_sync(mdev, sector, blksize);
4332 dec_rs_pending(mdev);
4333 return TRUE;
4334 }
4335 switch (be16_to_cpu(h->command)) {
4336 case P_RS_WRITE_ACK:
4337 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4338 what = write_acked_by_peer_and_sis;
4339 break;
4340 case P_WRITE_ACK:
4341 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4342 what = write_acked_by_peer;
4343 break;
4344 case P_RECV_ACK:
4345 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4346 what = recv_acked_by_peer;
4347 break;
4348 case P_DISCARD_ACK:
4349 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4350 what = conflict_discarded_by_peer;
4351 break;
4352 default:
4353 D_ASSERT(0);
4354 return FALSE;
4355 }
4356
4357 return validate_req_change_req_state(mdev, p->block_id, sector,
4358 _ack_id_to_req, __func__ , what);
4359}
4360
4361static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4362{
4363 struct p_block_ack *p = (struct p_block_ack *)h;
4364 sector_t sector = be64_to_cpu(p->sector);
4365
4366 if (__ratelimit(&drbd_ratelimit_state))
4367 dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4368
4369 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4370
4371 if (is_syncer_block_id(p->block_id)) {
4372 int size = be32_to_cpu(p->blksize);
4373 dec_rs_pending(mdev);
4374 drbd_rs_failed_io(mdev, sector, size);
4375 return TRUE;
4376 }
4377 return validate_req_change_req_state(mdev, p->block_id, sector,
4378 _ack_id_to_req, __func__ , neg_acked);
4379}
4380
4381static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4382{
4383 struct p_block_ack *p = (struct p_block_ack *)h;
4384 sector_t sector = be64_to_cpu(p->sector);
4385
4386 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4387 dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4388 (unsigned long long)sector, be32_to_cpu(p->blksize));
4389
4390 return validate_req_change_req_state(mdev, p->block_id, sector,
4391 _ar_id_to_req, __func__ , neg_acked);
4392}
4393
4394static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4395{
4396 sector_t sector;
4397 int size;
4398 struct p_block_ack *p = (struct p_block_ack *)h;
4399
4400 sector = be64_to_cpu(p->sector);
4401 size = be32_to_cpu(p->blksize);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004402
4403 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4404
4405 dec_rs_pending(mdev);
4406
4407 if (get_ldev_if_state(mdev, D_FAILED)) {
4408 drbd_rs_complete_io(mdev, sector);
4409 drbd_rs_failed_io(mdev, sector, size);
4410 put_ldev(mdev);
4411 }
4412
4413 return TRUE;
4414}
4415
4416static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4417{
4418 struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4419
4420 tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4421
4422 return TRUE;
4423}
4424
4425static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4426{
4427 struct p_block_ack *p = (struct p_block_ack *)h;
4428 struct drbd_work *w;
4429 sector_t sector;
4430 int size;
4431
4432 sector = be64_to_cpu(p->sector);
4433 size = be32_to_cpu(p->blksize);
4434
4435 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4436
4437 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4438 drbd_ov_oos_found(mdev, sector, size);
4439 else
4440 ov_oos_print(mdev);
4441
4442 drbd_rs_complete_io(mdev, sector);
4443 dec_rs_pending(mdev);
4444
4445 if (--mdev->ov_left == 0) {
4446 w = kmalloc(sizeof(*w), GFP_NOIO);
4447 if (w) {
4448 w->cb = w_ov_finished;
4449 drbd_queue_work_front(&mdev->data.work, w);
4450 } else {
4451 dev_err(DEV, "kmalloc(w) failed.");
4452 ov_oos_print(mdev);
4453 drbd_resync_finished(mdev);
4454 }
4455 }
4456 return TRUE;
4457}
4458
Philipp Reisner0ced55a2010-04-30 15:26:20 +02004459static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4460{
4461 struct p_delay_probe *p = (struct p_delay_probe *)h;
4462
4463 got_delay_probe(mdev, USE_META_SOCKET, p);
4464 return TRUE;
4465}
4466
Philipp Reisnerb411b362009-09-25 16:07:19 -07004467struct asender_cmd {
4468 size_t pkt_size;
4469 int (*process)(struct drbd_conf *mdev, struct p_header *h);
4470};
4471
4472static struct asender_cmd *get_asender_cmd(int cmd)
4473{
4474 static struct asender_cmd asender_tbl[] = {
4475 /* anything missing from this table is in
4476 * the drbd_cmd_handler (drbd_default_handler) table,
4477 * see the beginning of drbdd() */
4478 [P_PING] = { sizeof(struct p_header), got_Ping },
4479 [P_PING_ACK] = { sizeof(struct p_header), got_PingAck },
4480 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4481 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4482 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4483 [P_DISCARD_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
4484 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
4485 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
4486 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply},
4487 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
4488 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
4489 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4490 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
Philipp Reisner0ced55a2010-04-30 15:26:20 +02004491 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe), got_delay_probe_m },
Philipp Reisnerb411b362009-09-25 16:07:19 -07004492 [P_MAX_CMD] = { 0, NULL },
4493 };
4494 if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4495 return NULL;
4496 return &asender_tbl[cmd];
4497}
4498
4499int drbd_asender(struct drbd_thread *thi)
4500{
4501 struct drbd_conf *mdev = thi->mdev;
4502 struct p_header *h = &mdev->meta.rbuf.header;
4503 struct asender_cmd *cmd = NULL;
4504
4505 int rv, len;
4506 void *buf = h;
4507 int received = 0;
4508 int expect = sizeof(struct p_header);
4509 int empty;
4510
4511 sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4512
4513 current->policy = SCHED_RR; /* Make this a realtime task! */
4514 current->rt_priority = 2; /* more important than all other tasks */
4515
4516 while (get_t_state(thi) == Running) {
4517 drbd_thread_current_set_cpu(mdev);
4518 if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4519 ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4520 mdev->meta.socket->sk->sk_rcvtimeo =
4521 mdev->net_conf->ping_timeo*HZ/10;
4522 }
4523
4524 /* conditionally cork;
4525 * it may hurt latency if we cork without much to send */
4526 if (!mdev->net_conf->no_cork &&
4527 3 < atomic_read(&mdev->unacked_cnt))
4528 drbd_tcp_cork(mdev->meta.socket);
4529 while (1) {
4530 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4531 flush_signals(current);
4532 if (!drbd_process_done_ee(mdev)) {
4533 dev_err(DEV, "process_done_ee() = NOT_OK\n");
4534 goto reconnect;
4535 }
4536 /* to avoid race with newly queued ACKs */
4537 set_bit(SIGNAL_ASENDER, &mdev->flags);
4538 spin_lock_irq(&mdev->req_lock);
4539 empty = list_empty(&mdev->done_ee);
4540 spin_unlock_irq(&mdev->req_lock);
4541 /* new ack may have been queued right here,
4542 * but then there is also a signal pending,
4543 * and we start over... */
4544 if (empty)
4545 break;
4546 }
4547 /* but unconditionally uncork unless disabled */
4548 if (!mdev->net_conf->no_cork)
4549 drbd_tcp_uncork(mdev->meta.socket);
4550
4551 /* short circuit, recv_msg would return EINTR anyways. */
4552 if (signal_pending(current))
4553 continue;
4554
4555 rv = drbd_recv_short(mdev, mdev->meta.socket,
4556 buf, expect-received, 0);
4557 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4558
4559 flush_signals(current);
4560
4561 /* Note:
4562 * -EINTR (on meta) we got a signal
4563 * -EAGAIN (on meta) rcvtimeo expired
4564 * -ECONNRESET other side closed the connection
4565 * -ERESTARTSYS (on data) we got a signal
4566 * rv < 0 other than above: unexpected error!
4567 * rv == expected: full header or command
4568 * rv < expected: "woken" by signal during receive
4569 * rv == 0 : "connection shut down by peer"
4570 */
4571 if (likely(rv > 0)) {
4572 received += rv;
4573 buf += rv;
4574 } else if (rv == 0) {
4575 dev_err(DEV, "meta connection shut down by peer.\n");
4576 goto reconnect;
4577 } else if (rv == -EAGAIN) {
4578 if (mdev->meta.socket->sk->sk_rcvtimeo ==
4579 mdev->net_conf->ping_timeo*HZ/10) {
4580 dev_err(DEV, "PingAck did not arrive in time.\n");
4581 goto reconnect;
4582 }
4583 set_bit(SEND_PING, &mdev->flags);
4584 continue;
4585 } else if (rv == -EINTR) {
4586 continue;
4587 } else {
4588 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4589 goto reconnect;
4590 }
4591
4592 if (received == expect && cmd == NULL) {
4593 if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4594 dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4595 (long)be32_to_cpu(h->magic),
4596 h->command, h->length);
4597 goto reconnect;
4598 }
4599 cmd = get_asender_cmd(be16_to_cpu(h->command));
4600 len = be16_to_cpu(h->length);
4601 if (unlikely(cmd == NULL)) {
4602 dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4603 (long)be32_to_cpu(h->magic),
4604 h->command, h->length);
4605 goto disconnect;
4606 }
4607 expect = cmd->pkt_size;
Jens Axboe6a0afdf2009-10-01 09:04:14 +02004608 ERR_IF(len != expect-sizeof(struct p_header))
Philipp Reisnerb411b362009-09-25 16:07:19 -07004609 goto reconnect;
Philipp Reisnerb411b362009-09-25 16:07:19 -07004610 }
4611 if (received == expect) {
4612 D_ASSERT(cmd != NULL);
Philipp Reisnerb411b362009-09-25 16:07:19 -07004613 if (!cmd->process(mdev, h))
4614 goto reconnect;
4615
4616 buf = h;
4617 received = 0;
4618 expect = sizeof(struct p_header);
4619 cmd = NULL;
4620 }
4621 }
4622
4623 if (0) {
4624reconnect:
4625 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4626 }
4627 if (0) {
4628disconnect:
4629 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4630 }
4631 clear_bit(SIGNAL_ASENDER, &mdev->flags);
4632
4633 D_ASSERT(mdev->state.conn < C_CONNECTED);
4634 dev_info(DEV, "asender terminated\n");
4635
4636 return 0;
4637}