blob: d057e52f3b64921262d3a14610f060953126ee2c [file] [log] [blame]
Evgeniy Polyakova2376182009-01-14 02:05:28 +03001/*
2 * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 */
15
16#include <linux/buffer_head.h>
17#include <linux/blkdev.h>
18#include <linux/bio.h>
19#include <linux/connector.h>
20#include <linux/dst.h>
21#include <linux/device.h>
22#include <linux/in.h>
23#include <linux/in6.h>
24#include <linux/socket.h>
25#include <linux/slab.h>
26
27#include <net/sock.h>
28
29/*
30 * Polling machinery.
31 */
32
33struct dst_poll_helper
34{
35 poll_table pt;
36 struct dst_state *st;
37};
38
39static int dst_queue_wake(wait_queue_t *wait, unsigned mode, int sync, void *key)
40{
41 struct dst_state *st = container_of(wait, struct dst_state, wait);
42
43 wake_up(&st->thread_wait);
44 return 1;
45}
46
47static void dst_queue_func(struct file *file, wait_queue_head_t *whead,
48 poll_table *pt)
49{
50 struct dst_state *st = container_of(pt, struct dst_poll_helper, pt)->st;
51
52 st->whead = whead;
53 init_waitqueue_func_entry(&st->wait, dst_queue_wake);
54 add_wait_queue(whead, &st->wait);
55}
56
57void dst_poll_exit(struct dst_state *st)
58{
59 if (st->whead) {
60 remove_wait_queue(st->whead, &st->wait);
61 st->whead = NULL;
62 }
63}
64
65int dst_poll_init(struct dst_state *st)
66{
67 struct dst_poll_helper ph;
68
69 ph.st = st;
70 init_poll_funcptr(&ph.pt, &dst_queue_func);
71
72 st->socket->ops->poll(NULL, st->socket, &ph.pt);
73 return 0;
74}
75
76/*
77 * Header receiving function - may block.
78 */
79static int dst_data_recv_header(struct socket *sock,
80 void *data, unsigned int size, int block)
81{
82 struct msghdr msg;
83 struct kvec iov;
84 int err;
85
86 iov.iov_base = data;
87 iov.iov_len = size;
88
89 msg.msg_iov = (struct iovec *)&iov;
90 msg.msg_iovlen = 1;
91 msg.msg_name = NULL;
92 msg.msg_namelen = 0;
93 msg.msg_control = NULL;
94 msg.msg_controllen = 0;
95 msg.msg_flags = (block)?MSG_WAITALL:MSG_DONTWAIT;
96
97 err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len,
98 msg.msg_flags);
99 if (err != size)
100 return -1;
101
102 return 0;
103}
104
105/*
106 * Header sending function - may block.
107 */
108int dst_data_send_header(struct socket *sock,
109 void *data, unsigned int size, int more)
110{
111 struct msghdr msg;
112 struct kvec iov;
113 int err;
114
115 iov.iov_base = data;
116 iov.iov_len = size;
117
118 msg.msg_iov = (struct iovec *)&iov;
119 msg.msg_iovlen = 1;
120 msg.msg_name = NULL;
121 msg.msg_namelen = 0;
122 msg.msg_control = NULL;
123 msg.msg_controllen = 0;
124 msg.msg_flags = MSG_WAITALL | (more)?MSG_MORE:0;
125
126 err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
127 if (err != size) {
128 dprintk("%s: size: %u, more: %d, err: %d.\n",
129 __func__, size, more, err);
130 return -1;
131 }
132
133 return 0;
134}
135
136/*
137 * Block autoconfiguration: request size of the storage and permissions.
138 */
139static int dst_request_remote_config(struct dst_state *st)
140{
141 struct dst_node *n = st->node;
142 int err = -EINVAL;
143 struct dst_cmd *cmd = st->data;
144
145 memset(cmd, 0, sizeof(struct dst_cmd));
146 cmd->cmd = DST_CFG;
147
148 dst_convert_cmd(cmd);
149
150 err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
151 if (err)
152 goto out;
153
154 err = dst_data_recv_header(st->socket, cmd, sizeof(struct dst_cmd), 1);
155 if (err)
156 goto out;
157
158 dst_convert_cmd(cmd);
159
160 if (cmd->cmd != DST_CFG) {
161 err = -EINVAL;
162 dprintk("%s: checking result: cmd: %d, size reported: %llu.\n",
163 __func__, cmd->cmd, cmd->sector);
164 goto out;
165 }
166
167 if (n->size != 0)
168 n->size = min_t(loff_t, n->size, cmd->sector);
169 else
170 n->size = cmd->sector;
171
172 n->info->size = n->size;
173 st->permissions = cmd->rw;
174
175out:
176 dprintk("%s: n: %p, err: %d, size: %llu, permission: %x.\n",
177 __func__, n, err, n->size, st->permissions);
178 return err;
179}
180
181/*
182 * Socket machinery.
183 */
184
185#define DST_DEFAULT_TIMEO 20000
186
187int dst_state_socket_create(struct dst_state *st)
188{
189 int err;
190 struct socket *sock;
191 struct dst_network_ctl *ctl = &st->ctl;
192
193 err = sock_create(ctl->addr.sa_family, ctl->type, ctl->proto, &sock);
194 if (err < 0)
195 return err;
196
197 sock->sk->sk_sndtimeo = sock->sk->sk_rcvtimeo =
198 msecs_to_jiffies(DST_DEFAULT_TIMEO);
199 sock->sk->sk_allocation = GFP_NOIO;
200
201 st->socket = st->read_socket = sock;
202 return 0;
203}
204
205void dst_state_socket_release(struct dst_state *st)
206{
207 dprintk("%s: st: %p, socket: %p, n: %p.\n",
208 __func__, st, st->socket, st->node);
209 if (st->socket) {
210 sock_release(st->socket);
211 st->socket = NULL;
212 st->read_socket = NULL;
213 }
214}
215
216void dst_dump_addr(struct socket *sk, struct sockaddr *sa, char *str)
217{
218 if (sk->ops->family == AF_INET) {
219 struct sockaddr_in *sin = (struct sockaddr_in *)sa;
220 printk(KERN_INFO "%s %u.%u.%u.%u:%d.\n",
221 str, NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
222 } else if (sk->ops->family == AF_INET6) {
223 struct sockaddr_in6 *sin = (struct sockaddr_in6 *)sa;
224 printk(KERN_INFO "%s %pi6:%d",
225 str, &sin->sin6_addr, ntohs(sin->sin6_port));
226 }
227}
228
229void dst_state_exit_connected(struct dst_state *st)
230{
231 if (st->socket) {
232 dst_poll_exit(st);
233 st->socket->ops->shutdown(st->socket, 2);
234
235 dst_dump_addr(st->socket, (struct sockaddr *)&st->ctl.addr,
236 "Disconnected peer");
237 dst_state_socket_release(st);
238 }
239}
240
241static int dst_state_init_connected(struct dst_state *st)
242{
243 int err;
244 struct dst_network_ctl *ctl = &st->ctl;
245
246 err = dst_state_socket_create(st);
247 if (err)
248 goto err_out_exit;
249
250 err = kernel_connect(st->socket, (struct sockaddr *)&st->ctl.addr,
251 st->ctl.addr.sa_data_len, 0);
252 if (err)
253 goto err_out_release;
254
255 err = dst_poll_init(st);
256 if (err)
257 goto err_out_release;
258
259 dst_dump_addr(st->socket, (struct sockaddr *)&ctl->addr,
260 "Connected to peer");
261
262 return 0;
263
264err_out_release:
265 dst_state_socket_release(st);
266err_out_exit:
267 return err;
268}
269
270/*
271 * State reset is used to reconnect to the remote peer.
272 * May fail, but who cares, we will try again later.
273 */
274static void inline dst_state_reset_nolock(struct dst_state *st)
275{
276 dst_state_exit_connected(st);
277 dst_state_init_connected(st);
278}
279
280static void inline dst_state_reset(struct dst_state *st)
281{
282 dst_state_lock(st);
283 dst_state_reset_nolock(st);
284 dst_state_unlock(st);
285}
286
287/*
288 * Basic network sending/receiving functions.
289 * Blocked mode is used.
290 */
291static int dst_data_recv_raw(struct dst_state *st, void *buf, u64 size)
292{
293 struct msghdr msg;
294 struct kvec iov;
295 int err;
296
297 BUG_ON(!size);
298
299 iov.iov_base = buf;
300 iov.iov_len = size;
301
302 msg.msg_iov = (struct iovec *)&iov;
303 msg.msg_iovlen = 1;
304 msg.msg_name = NULL;
305 msg.msg_namelen = 0;
306 msg.msg_control = NULL;
307 msg.msg_controllen = 0;
308 msg.msg_flags = MSG_DONTWAIT;
309
310 err = kernel_recvmsg(st->socket, &msg, &iov, 1, iov.iov_len,
311 msg.msg_flags);
312 if (err <= 0) {
313 dprintk("%s: failed to recv data: size: %llu, err: %d.\n",
314 __func__, size, err);
315 if (err == 0)
316 err = -ECONNRESET;
317
318 dst_state_exit_connected(st);
319 }
320
321 return err;
322}
323
324/*
325 * Ping command to early detect failed nodes.
326 */
327static int dst_send_ping(struct dst_state *st)
328{
329 struct dst_cmd *cmd = st->data;
330 int err = -ECONNRESET;
331
332 dst_state_lock(st);
333 if (st->socket) {
334 memset(cmd, 0, sizeof(struct dst_cmd));
335
336 cmd->cmd = __cpu_to_be32(DST_PING);
337
338 err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
339 }
340 dprintk("%s: st: %p, socket: %p, err: %d.\n", __func__, st, st->socket, err);
341 dst_state_unlock(st);
342
343 return err;
344}
345
346/*
347 * Receiving function, which should either return error or read
348 * whole block request. If there was no traffic for a one second,
349 * send a ping, since remote node may die.
350 */
351int dst_data_recv(struct dst_state *st, void *data, unsigned int size)
352{
353 unsigned int revents = 0;
354 unsigned int err_mask = POLLERR | POLLHUP | POLLRDHUP;
355 unsigned int mask = err_mask | POLLIN;
356 struct dst_node *n = st->node;
357 int err = 0;
358
359 while (size && !err) {
360 revents = dst_state_poll(st);
361
362 if (!(revents & mask)) {
363 DEFINE_WAIT(wait);
364
365 for (;;) {
366 prepare_to_wait(&st->thread_wait, &wait,
367 TASK_INTERRUPTIBLE);
368 if (!n->trans_scan_timeout || st->need_exit)
369 break;
370
371 revents = dst_state_poll(st);
372
373 if (revents & mask)
374 break;
375
376 if (signal_pending(current))
377 break;
378
379 if (!schedule_timeout(HZ)) {
380 err = dst_send_ping(st);
381 if (err)
382 return err;
383 }
384
385 continue;
386 }
387 finish_wait(&st->thread_wait, &wait);
388 }
389
390 err = -ECONNRESET;
391 dst_state_lock(st);
392
393 if ( st->socket &&
394 (st->read_socket == st->socket) &&
395 (revents & POLLIN)) {
396 err = dst_data_recv_raw(st, data, size);
397 if (err > 0) {
398 data += err;
399 size -= err;
400 err = 0;
401 }
402 }
403
404 if (revents & err_mask || !st->socket) {
405 dprintk("%s: revents: %x, socket: %p, size: %u, err: %d.\n",
406 __func__, revents, st->socket, size, err);
407 err = -ECONNRESET;
408 }
409
410 dst_state_unlock(st);
411
412 if (!n->trans_scan_timeout)
413 err = -ENODEV;
414 }
415
416 return err;
417}
418
419/*
420 * Send block autoconf reply.
421 */
422static int dst_process_cfg(struct dst_state *st)
423{
424 struct dst_node *n = st->node;
425 struct dst_cmd *cmd = st->data;
426 int err;
427
428 cmd->sector = n->size;
429 cmd->rw = st->permissions;
430
431 dst_convert_cmd(cmd);
432
433 dst_state_lock(st);
434 err = dst_data_send_header(st->socket, cmd, sizeof(struct dst_cmd), 0);
435 dst_state_unlock(st);
436
437 return err;
438}
439
440/*
441 * Receive block IO from the network.
442 */
443static int dst_recv_bio(struct dst_state *st, struct bio *bio, unsigned int total_size)
444{
445 struct bio_vec *bv;
446 int i, err;
447 void *data;
448 unsigned int sz;
449
450 bio_for_each_segment(bv, bio, i) {
451 sz = min(total_size, bv->bv_len);
452
453 dprintk("%s: bio: %llu/%u, total: %u, len: %u, sz: %u, off: %u.\n",
454 __func__, (u64)bio->bi_sector, bio->bi_size, total_size,
455 bv->bv_len, sz, bv->bv_offset);
456
457 data = kmap(bv->bv_page) + bv->bv_offset;
458 err = dst_data_recv(st, data, sz);
459 kunmap(bv->bv_page);
460
461 bv->bv_len = sz;
462
463 if (err)
464 return err;
465
466 total_size -= sz;
467 if (total_size == 0)
468 break;
469 }
470
471 return 0;
472}
473
474/*
475 * Our block IO has just completed and arrived: get it.
476 */
477static int dst_process_io_response(struct dst_state *st)
478{
479 struct dst_node *n = st->node;
480 struct dst_cmd *cmd = st->data;
481 struct dst_trans *t;
482 int err = 0;
483 struct bio *bio;
484
485 mutex_lock(&n->trans_lock);
486 t = dst_trans_search(n, cmd->id);
487 mutex_unlock(&n->trans_lock);
488
489 if (!t)
490 goto err_out_exit;
491
492 bio = t->bio;
493
494 dprintk("%s: bio: %llu/%u, cmd_size: %u, csize: %u, dir: %lu.\n",
495 __func__, (u64)bio->bi_sector, bio->bi_size, cmd->size,
496 cmd->csize, bio_data_dir(bio));
497
498 if (bio_data_dir(bio) == READ) {
499 if (bio->bi_size != cmd->size - cmd->csize)
500 goto err_out_exit;
501
502 if (dst_need_crypto(n)) {
503 err = dst_recv_cdata(st, t->cmd.hash);
504 if (err)
505 goto err_out_exit;
506 }
507
508 err = dst_recv_bio(st, t->bio, bio->bi_size);
509 if (err)
510 goto err_out_exit;
511
512 if (dst_need_crypto(n))
513 return dst_trans_crypto(t);
514 } else {
515 err = -EBADMSG;
516 if (cmd->size || cmd->csize)
517 goto err_out_exit;
518 }
519
520 dst_trans_remove(t);
521 dst_trans_put(t);
522
523 return 0;
524
525err_out_exit:
526 return err;
527}
528
529/*
530 * Receive crypto data.
531 */
532int dst_recv_cdata(struct dst_state *st, void *cdata)
533{
534 struct dst_cmd *cmd = st->data;
535 struct dst_node *n = st->node;
536 struct dst_crypto_ctl *c = &n->crypto;
537 int err;
538
539 if (cmd->csize != c->crypto_attached_size) {
540 dprintk("%s: cmd: cmd: %u, sector: %llu, size: %u, "
541 "csize: %u != digest size %u.\n",
542 __func__, cmd->cmd, cmd->sector, cmd->size,
543 cmd->csize, c->crypto_attached_size);
544 err = -EINVAL;
545 goto err_out_exit;
546 }
547
548 err = dst_data_recv(st, cdata, cmd->csize);
549 if (err)
550 goto err_out_exit;
551
552 cmd->size -= cmd->csize;
553 return 0;
554
555err_out_exit:
556 return err;
557}
558
559/*
560 * Receive the command and start its processing.
561 */
562static int dst_recv_processing(struct dst_state *st)
563{
564 int err = -EINTR;
565 struct dst_cmd *cmd = st->data;
566
567 /*
568 * If socket will be reset after this statement, then
569 * dst_data_recv() will just fail and loop will
570 * start again, so it can be done without any locks.
571 *
572 * st->read_socket is needed to prevents state machine
573 * breaking between this data reading and subsequent one
574 * in protocol specific functions during connection reset.
575 * In case of reset we have to read next command and do
576 * not expect data for old command to magically appear in
577 * new connection.
578 */
579 st->read_socket = st->socket;
580 err = dst_data_recv(st, cmd, sizeof(struct dst_cmd));
581 if (err)
582 goto out_exit;
583
584 dst_convert_cmd(cmd);
585
586 dprintk("%s: cmd: %u, size: %u, csize: %u, id: %llu, "
587 "sector: %llu, flags: %llx, rw: %llx.\n",
588 __func__, cmd->cmd, cmd->size,
589 cmd->csize, cmd->id, cmd->sector,
590 cmd->flags, cmd->rw);
591
592 /*
593 * This should catch protocol breakage and random garbage instead of commands.
594 */
595 if (unlikely(cmd->csize > st->size - sizeof(struct dst_cmd))) {
596 err = -EBADMSG;
597 goto out_exit;
598 }
599
600 err = -EPROTO;
601 switch (cmd->cmd) {
602 case DST_IO_RESPONSE:
603 err = dst_process_io_response(st);
604 break;
605 case DST_IO:
606 err = dst_process_io(st);
607 break;
608 case DST_CFG:
609 err = dst_process_cfg(st);
610 break;
611 case DST_PING:
612 err = 0;
613 break;
614 default:
615 break;
616 }
617
618out_exit:
619 return err;
620}
621
622/*
623 * Receiving thread. For the client node we should try to reconnect,
624 * for accepted client we just drop the state and expect it to reconnect.
625 */
626static int dst_recv(void *init_data, void *schedule_data)
627{
628 struct dst_state *st = schedule_data;
629 struct dst_node *n = init_data;
630 int err = 0;
631
632 dprintk("%s: start st: %p, n: %p, scan: %lu, need_exit: %d.\n",
633 __func__, st, n, n->trans_scan_timeout, st->need_exit);
634
635 while (n->trans_scan_timeout && !st->need_exit) {
636 err = dst_recv_processing(st);
637 if (err < 0) {
638 if (!st->ctl.type)
639 break;
640
641 if (!n->trans_scan_timeout || st->need_exit)
642 break;
643
644 dst_state_reset(st);
645 msleep(1000);
646 }
647 }
648
649 st->need_exit = 1;
650 wake_up(&st->thread_wait);
651
652 dprintk("%s: freeing receiving socket st: %p.\n", __func__, st);
653 dst_state_lock(st);
654 dst_state_exit_connected(st);
655 dst_state_unlock(st);
656 dst_state_put(st);
657
658 dprintk("%s: freed receiving socket st: %p.\n", __func__, st);
659
660 return err;
661}
662
663/*
664 * Network state dies here and borns couple of lines below.
665 * This object is the main network state processing engine:
666 * sending, receiving, reconnections, all network related
667 * tasks are handled on behalf of the state.
668 */
669static void dst_state_free(struct dst_state *st)
670{
671 dprintk("%s: st: %p.\n", __func__, st);
672 if (st->cleanup)
673 st->cleanup(st);
674 kfree(st->data);
675 kfree(st);
676}
677
678struct dst_state *dst_state_alloc(struct dst_node *n)
679{
680 struct dst_state *st;
681 int err = -ENOMEM;
682
683 st = kzalloc(sizeof(struct dst_state), GFP_KERNEL);
684 if (!st)
685 goto err_out_exit;
686
687 st->node = n;
688 st->need_exit = 0;
689
690 st->size = PAGE_SIZE;
691 st->data = kmalloc(st->size, GFP_KERNEL);
692 if (!st->data)
693 goto err_out_free;
694
695 spin_lock_init(&st->request_lock);
696 INIT_LIST_HEAD(&st->request_list);
697
698 mutex_init(&st->state_lock);
699 init_waitqueue_head(&st->thread_wait);
700
701 /*
702 * One for processing thread, another one for node itself.
703 */
704 atomic_set(&st->refcnt, 2);
705
706 dprintk("%s: st: %p, n: %p.\n", __func__, st, st->node);
707
708 return st;
709
710err_out_free:
711 kfree(st);
712err_out_exit:
713 return ERR_PTR(err);
714}
715
716int dst_state_schedule_receiver(struct dst_state *st)
717{
718 return thread_pool_schedule_private(st->node->pool, dst_thread_setup,
719 dst_recv, st, MAX_SCHEDULE_TIMEOUT, st->node);
720}
721
722/*
723 * Initialize client's connection to the remote peer: allocate state,
724 * connect and perform block IO autoconfiguration.
725 */
726int dst_node_init_connected(struct dst_node *n, struct dst_network_ctl *r)
727{
728 struct dst_state *st;
729 int err = -ENOMEM;
730
731 st = dst_state_alloc(n);
732 if (IS_ERR(st)) {
733 err = PTR_ERR(st);
734 goto err_out_exit;
735 }
736 memcpy(&st->ctl, r, sizeof(struct dst_network_ctl));
737
738 err = dst_state_init_connected(st);
739 if (err)
740 goto err_out_free_data;
741
742 err = dst_request_remote_config(st);
743 if (err)
744 goto err_out_exit_connected;
745 n->state = st;
746
747 err = dst_state_schedule_receiver(st);
748 if (err)
749 goto err_out_exit_connected;
750
751 return 0;
752
753err_out_exit_connected:
754 dst_state_exit_connected(st);
755err_out_free_data:
756 dst_state_free(st);
757err_out_exit:
758 n->state = NULL;
759 return err;
760}
761
762void dst_state_put(struct dst_state *st)
763{
764 dprintk("%s: st: %p, refcnt: %d.\n",
765 __func__, st, atomic_read(&st->refcnt));
766 if (atomic_dec_and_test(&st->refcnt))
767 dst_state_free(st);
768}
769
770/*
771 * Send block IO to the network one by one using zero-copy ->sendpage().
772 */
773int dst_send_bio(struct dst_state *st, struct dst_cmd *cmd, struct bio *bio)
774{
775 struct bio_vec *bv;
776 struct dst_crypto_ctl *c = &st->node->crypto;
777 int err, i = 0;
778 int flags = MSG_WAITALL;
779
780 err = dst_data_send_header(st->socket, cmd,
781 sizeof(struct dst_cmd) + c->crypto_attached_size, bio->bi_vcnt);
782 if (err)
783 goto err_out_exit;
784
785 bio_for_each_segment(bv, bio, i) {
786 if (i < bio->bi_vcnt - 1)
787 flags |= MSG_MORE;
788
789 err = kernel_sendpage(st->socket, bv->bv_page, bv->bv_offset,
790 bv->bv_len, flags);
791 if (err <= 0)
792 goto err_out_exit;
793 }
794
795 return 0;
796
797err_out_exit:
798 dprintk("%s: %d/%d, flags: %x, err: %d.\n",
799 __func__, i, bio->bi_vcnt, flags, err);
800 return err;
801}
802
803/*
804 * Send transaction to the remote peer.
805 */
806int dst_trans_send(struct dst_trans *t)
807{
808 int err;
809 struct dst_state *st = t->n->state;
810 struct bio *bio = t->bio;
811
812 dst_convert_cmd(&t->cmd);
813
814 dst_state_lock(st);
815 if (!st->socket) {
816 err = dst_state_init_connected(st);
817 if (err)
818 goto err_out_unlock;
819 }
820
821 if (bio_data_dir(bio) == WRITE) {
822 err = dst_send_bio(st, &t->cmd, t->bio);
823 } else {
824 err = dst_data_send_header(st->socket, &t->cmd,
825 sizeof(struct dst_cmd), 0);
826 }
827 if (err)
828 goto err_out_reset;
829
830 dst_state_unlock(st);
831 return 0;
832
833err_out_reset:
834 dst_state_reset_nolock(st);
835err_out_unlock:
836 dst_state_unlock(st);
837
838 return err;
839}