blob: dc83a9d979b5285bb5e9704a9d32cef199a5c129 [file] [log] [blame]
David Teiglande7fd4172006-01-18 09:30:29 +00001/******************************************************************************
2*******************************************************************************
3**
4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
Patrick Caulfieldac33d072006-12-06 15:10:37 +00005** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
David Teiglande7fd4172006-01-18 09:30:29 +00006**
7** This copyrighted material is made available to anyone wishing to use,
8** modify, copy, or redistribute it subject to the terms and conditions
9** of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never (well, hardly ever) waits.
42 *
43 */
44
45#include <asm/ioctls.h>
46#include <net/sock.h>
47#include <net/tcp.h>
48#include <net/sctp/user.h>
49#include <linux/pagemap.h>
50#include <linux/socket.h>
51#include <linux/idr.h>
52
53#include "dlm_internal.h"
54#include "lowcomms.h"
55#include "config.h"
56#include "midcomms.h"
57
Steven Whitehouse47c962982006-05-25 17:43:14 -040058static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
59static int dlm_local_count;
60static int dlm_local_nodeid;
David Teiglande7fd4172006-01-18 09:30:29 +000061
62/* One of these per connected node */
63
64#define NI_INIT_PENDING 1
65#define NI_WRITE_PENDING 2
66
67struct nodeinfo {
68 spinlock_t lock;
69 sctp_assoc_t assoc_id;
70 unsigned long flags;
71 struct list_head write_list; /* nodes with pending writes */
72 struct list_head writequeue; /* outgoing writequeue_entries */
73 spinlock_t writequeue_lock;
74 int nodeid;
Patrick Caulfield1d6e8132007-01-15 14:33:34 +000075 struct work_struct swork; /* Send workqueue */
76 struct work_struct lwork; /* Locking workqueue */
David Teiglande7fd4172006-01-18 09:30:29 +000077};
78
79static DEFINE_IDR(nodeinfo_idr);
Patrick Caulfieldac33d072006-12-06 15:10:37 +000080static DECLARE_RWSEM(nodeinfo_lock);
81static int max_nodeid;
David Teiglande7fd4172006-01-18 09:30:29 +000082
83struct cbuf {
Patrick Caulfieldac33d072006-12-06 15:10:37 +000084 unsigned int base;
85 unsigned int len;
86 unsigned int mask;
David Teiglande7fd4172006-01-18 09:30:29 +000087};
88
89/* Just the one of these, now. But this struct keeps
90 the connection-specific variables together */
91
92#define CF_READ_PENDING 1
93
94struct connection {
Patrick Caulfieldac33d072006-12-06 15:10:37 +000095 struct socket *sock;
David Teiglande7fd4172006-01-18 09:30:29 +000096 unsigned long flags;
Patrick Caulfieldac33d072006-12-06 15:10:37 +000097 struct page *rx_page;
David Teiglande7fd4172006-01-18 09:30:29 +000098 atomic_t waiting_requests;
99 struct cbuf cb;
100 int eagain_flag;
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000101 struct work_struct work; /* Send workqueue */
David Teiglande7fd4172006-01-18 09:30:29 +0000102};
103
104/* An entry waiting to be sent */
105
106struct writequeue_entry {
107 struct list_head list;
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000108 struct page *page;
David Teiglande7fd4172006-01-18 09:30:29 +0000109 int offset;
110 int len;
111 int end;
112 int users;
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000113 struct nodeinfo *ni;
David Teiglande7fd4172006-01-18 09:30:29 +0000114};
115
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000116static void cbuf_add(struct cbuf *cb, int n)
117{
118 cb->len += n;
119}
David Teiglande7fd4172006-01-18 09:30:29 +0000120
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000121static int cbuf_data(struct cbuf *cb)
122{
123 return ((cb->base + cb->len) & cb->mask);
124}
David Teiglande7fd4172006-01-18 09:30:29 +0000125
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000126static void cbuf_init(struct cbuf *cb, int size)
127{
128 cb->base = cb->len = 0;
129 cb->mask = size-1;
130}
David Teiglande7fd4172006-01-18 09:30:29 +0000131
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000132static void cbuf_eat(struct cbuf *cb, int n)
133{
134 cb->len -= n;
135 cb->base += n;
136 cb->base &= cb->mask;
137}
David Teiglande7fd4172006-01-18 09:30:29 +0000138
139/* List of nodes which have writes pending */
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000140static LIST_HEAD(write_nodes);
141static DEFINE_SPINLOCK(write_nodes_lock);
David Teiglande7fd4172006-01-18 09:30:29 +0000142
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000143
David Teiglande7fd4172006-01-18 09:30:29 +0000144/* Maximum number of incoming messages to process before
145 * doing a schedule()
146 */
147#define MAX_RX_MSG_COUNT 25
148
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000149/* Work queues */
150static struct workqueue_struct *recv_workqueue;
151static struct workqueue_struct *send_workqueue;
152static struct workqueue_struct *lock_workqueue;
David Teiglande7fd4172006-01-18 09:30:29 +0000153
154/* The SCTP connection */
155static struct connection sctp_con;
156
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000157static void process_send_sockets(struct work_struct *work);
158static void process_recv_sockets(struct work_struct *work);
159static void process_lock_request(struct work_struct *work);
David Teiglande7fd4172006-01-18 09:30:29 +0000160
161static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
162{
163 struct sockaddr_storage addr;
164 int error;
165
Steven Whitehouse47c962982006-05-25 17:43:14 -0400166 if (!dlm_local_count)
David Teiglande7fd4172006-01-18 09:30:29 +0000167 return -1;
168
169 error = dlm_nodeid_to_addr(nodeid, &addr);
170 if (error)
171 return error;
172
Steven Whitehouse47c962982006-05-25 17:43:14 -0400173 if (dlm_local_addr[0]->ss_family == AF_INET) {
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000174 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
David Teiglande7fd4172006-01-18 09:30:29 +0000175 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
176 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
177 } else {
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000178 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
David Teiglande7fd4172006-01-18 09:30:29 +0000179 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
180 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
181 sizeof(in6->sin6_addr));
182 }
183
184 return 0;
185}
186
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000187/* If alloc is 0 here we will not attempt to allocate a new
188 nodeinfo struct */
Al Viro38d6fd22006-10-09 20:27:30 +0100189static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
David Teiglande7fd4172006-01-18 09:30:29 +0000190{
191 struct nodeinfo *ni;
192 int r;
193 int n;
194
195 down_read(&nodeinfo_lock);
196 ni = idr_find(&nodeinfo_idr, nodeid);
197 up_read(&nodeinfo_lock);
198
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000199 if (ni || !alloc)
200 return ni;
David Teiglande7fd4172006-01-18 09:30:29 +0000201
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000202 down_write(&nodeinfo_lock);
David Teiglande7fd4172006-01-18 09:30:29 +0000203
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000204 ni = idr_find(&nodeinfo_idr, nodeid);
205 if (ni)
206 goto out_up;
David Teiglande7fd4172006-01-18 09:30:29 +0000207
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000208 r = idr_pre_get(&nodeinfo_idr, alloc);
209 if (!r)
210 goto out_up;
David Teiglande7fd4172006-01-18 09:30:29 +0000211
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000212 ni = kmalloc(sizeof(struct nodeinfo), alloc);
213 if (!ni)
214 goto out_up;
David Teiglande7fd4172006-01-18 09:30:29 +0000215
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000216 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
217 if (r) {
218 kfree(ni);
219 ni = NULL;
220 goto out_up;
David Teiglande7fd4172006-01-18 09:30:29 +0000221 }
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000222 if (n != nodeid) {
223 idr_remove(&nodeinfo_idr, n);
224 kfree(ni);
225 ni = NULL;
226 goto out_up;
227 }
228 memset(ni, 0, sizeof(struct nodeinfo));
229 spin_lock_init(&ni->lock);
230 INIT_LIST_HEAD(&ni->writequeue);
231 spin_lock_init(&ni->writequeue_lock);
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000232 INIT_WORK(&ni->lwork, process_lock_request);
233 INIT_WORK(&ni->swork, process_send_sockets);
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000234 ni->nodeid = nodeid;
235
236 if (nodeid > max_nodeid)
237 max_nodeid = nodeid;
238out_up:
239 up_write(&nodeinfo_lock);
David Teiglande7fd4172006-01-18 09:30:29 +0000240
241 return ni;
242}
243
244/* Don't call this too often... */
245static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
246{
247 int i;
248 struct nodeinfo *ni;
249
250 for (i=1; i<=max_nodeid; i++) {
251 ni = nodeid2nodeinfo(i, 0);
252 if (ni && ni->assoc_id == assoc)
253 return ni;
254 }
255 return NULL;
256}
257
258/* Data or notification available on socket */
259static void lowcomms_data_ready(struct sock *sk, int count_unused)
260{
David Teiglande7fd4172006-01-18 09:30:29 +0000261 if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000262 queue_work(recv_workqueue, &sctp_con.work);
David Teiglande7fd4172006-01-18 09:30:29 +0000263}
264
265
266/* Add the port number to an IP6 or 4 sockaddr and return the address length.
267 Also padd out the struct with zeros to make comparisons meaningful */
268
269static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
270 int *addr_len)
271{
272 struct sockaddr_in *local4_addr;
273 struct sockaddr_in6 *local6_addr;
274
Steven Whitehouse47c962982006-05-25 17:43:14 -0400275 if (!dlm_local_count)
David Teiglande7fd4172006-01-18 09:30:29 +0000276 return;
277
278 if (!port) {
Steven Whitehouse47c962982006-05-25 17:43:14 -0400279 if (dlm_local_addr[0]->ss_family == AF_INET) {
280 local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
David Teiglande7fd4172006-01-18 09:30:29 +0000281 port = be16_to_cpu(local4_addr->sin_port);
282 } else {
Steven Whitehouse47c962982006-05-25 17:43:14 -0400283 local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
David Teiglande7fd4172006-01-18 09:30:29 +0000284 port = be16_to_cpu(local6_addr->sin6_port);
285 }
286 }
287
Steven Whitehouse47c962982006-05-25 17:43:14 -0400288 saddr->ss_family = dlm_local_addr[0]->ss_family;
289 if (dlm_local_addr[0]->ss_family == AF_INET) {
David Teiglande7fd4172006-01-18 09:30:29 +0000290 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
291 in4_addr->sin_port = cpu_to_be16(port);
292 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
293 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000294 sizeof(struct sockaddr_in));
David Teiglande7fd4172006-01-18 09:30:29 +0000295 *addr_len = sizeof(struct sockaddr_in);
296 } else {
297 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
298 in6_addr->sin6_port = cpu_to_be16(port);
299 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000300 sizeof(struct sockaddr_in6));
David Teiglande7fd4172006-01-18 09:30:29 +0000301 *addr_len = sizeof(struct sockaddr_in6);
302 }
303}
304
305/* Close the connection and tidy up */
306static void close_connection(void)
307{
308 if (sctp_con.sock) {
309 sock_release(sctp_con.sock);
310 sctp_con.sock = NULL;
311 }
312
313 if (sctp_con.rx_page) {
314 __free_page(sctp_con.rx_page);
315 sctp_con.rx_page = NULL;
316 }
317}
318
319/* We only send shutdown messages to nodes that are not part of the cluster */
320static void send_shutdown(sctp_assoc_t associd)
321{
322 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
323 struct msghdr outmessage;
324 struct cmsghdr *cmsg;
325 struct sctp_sndrcvinfo *sinfo;
326 int ret;
327
328 outmessage.msg_name = NULL;
329 outmessage.msg_namelen = 0;
330 outmessage.msg_control = outcmsg;
331 outmessage.msg_controllen = sizeof(outcmsg);
332 outmessage.msg_flags = MSG_EOR;
333
334 cmsg = CMSG_FIRSTHDR(&outmessage);
335 cmsg->cmsg_level = IPPROTO_SCTP;
336 cmsg->cmsg_type = SCTP_SNDRCV;
337 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
338 outmessage.msg_controllen = cmsg->cmsg_len;
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000339 sinfo = CMSG_DATA(cmsg);
David Teiglande7fd4172006-01-18 09:30:29 +0000340 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
341
342 sinfo->sinfo_flags |= MSG_EOF;
343 sinfo->sinfo_assoc_id = associd;
344
345 ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
346
347 if (ret != 0)
348 log_print("send EOF to node failed: %d", ret);
349}
350
351
352/* INIT failed but we don't know which node...
353 restart INIT on all pending nodes */
354static void init_failed(void)
355{
356 int i;
357 struct nodeinfo *ni;
358
359 for (i=1; i<=max_nodeid; i++) {
360 ni = nodeid2nodeinfo(i, 0);
361 if (!ni)
362 continue;
363
364 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
365 ni->assoc_id = 0;
366 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
367 spin_lock_bh(&write_nodes_lock);
368 list_add_tail(&ni->write_list, &write_nodes);
369 spin_unlock_bh(&write_nodes_lock);
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000370 queue_work(send_workqueue, &ni->swork);
David Teiglande7fd4172006-01-18 09:30:29 +0000371 }
372 }
373 }
David Teiglande7fd4172006-01-18 09:30:29 +0000374}
375
376/* Something happened to an association */
377static void process_sctp_notification(struct msghdr *msg, char *buf)
378{
379 union sctp_notification *sn = (union sctp_notification *)buf;
380
381 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
382 switch (sn->sn_assoc_change.sac_state) {
383
384 case SCTP_COMM_UP:
385 case SCTP_RESTART:
386 {
387 /* Check that the new node is in the lockspace */
388 struct sctp_prim prim;
389 mm_segment_t fs;
390 int nodeid;
391 int prim_len, ret;
392 int addr_len;
393 struct nodeinfo *ni;
394
395 /* This seems to happen when we received a connection
396 * too early... or something... anyway, it happens but
397 * we always seem to get a real message too, see
398 * receive_from_sock */
399
400 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
401 log_print("COMM_UP for invalid assoc ID %d",
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000402 (int)sn->sn_assoc_change.sac_assoc_id);
David Teiglande7fd4172006-01-18 09:30:29 +0000403 init_failed();
404 return;
405 }
406 memset(&prim, 0, sizeof(struct sctp_prim));
407 prim_len = sizeof(struct sctp_prim);
408 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
409
410 fs = get_fs();
411 set_fs(get_ds());
412 ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000413 IPPROTO_SCTP,
414 SCTP_PRIMARY_ADDR,
415 (char*)&prim,
416 &prim_len);
David Teiglande7fd4172006-01-18 09:30:29 +0000417 set_fs(fs);
418 if (ret < 0) {
419 struct nodeinfo *ni;
420
421 log_print("getsockopt/sctp_primary_addr on "
422 "new assoc %d failed : %d",
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000423 (int)sn->sn_assoc_change.sac_assoc_id,
424 ret);
David Teiglande7fd4172006-01-18 09:30:29 +0000425
426 /* Retry INIT later */
427 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
428 if (ni)
429 clear_bit(NI_INIT_PENDING, &ni->flags);
430 return;
431 }
432 make_sockaddr(&prim.ssp_addr, 0, &addr_len);
433 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
434 log_print("reject connect from unknown addr");
435 send_shutdown(prim.ssp_assoc_id);
436 return;
437 }
438
439 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
440 if (!ni)
441 return;
442
443 /* Save the assoc ID */
David Teiglande7fd4172006-01-18 09:30:29 +0000444 ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
David Teiglande7fd4172006-01-18 09:30:29 +0000445
446 log_print("got new/restarted association %d nodeid %d",
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000447 (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
David Teiglande7fd4172006-01-18 09:30:29 +0000448
449 /* Send any pending writes */
450 clear_bit(NI_INIT_PENDING, &ni->flags);
451 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
452 spin_lock_bh(&write_nodes_lock);
453 list_add_tail(&ni->write_list, &write_nodes);
454 spin_unlock_bh(&write_nodes_lock);
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000455 queue_work(send_workqueue, &ni->swork);
David Teiglande7fd4172006-01-18 09:30:29 +0000456 }
David Teiglande7fd4172006-01-18 09:30:29 +0000457 }
458 break;
459
460 case SCTP_COMM_LOST:
461 case SCTP_SHUTDOWN_COMP:
462 {
463 struct nodeinfo *ni;
464
465 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
466 if (ni) {
467 spin_lock(&ni->lock);
468 ni->assoc_id = 0;
469 spin_unlock(&ni->lock);
470 }
471 }
472 break;
473
474 /* We don't know which INIT failed, so clear the PENDING flags
475 * on them all. if assoc_id is zero then it will then try
476 * again */
477
478 case SCTP_CANT_STR_ASSOC:
479 {
480 log_print("Can't start SCTP association - retrying");
481 init_failed();
482 }
483 break;
484
485 default:
486 log_print("unexpected SCTP assoc change id=%d state=%d",
487 (int)sn->sn_assoc_change.sac_assoc_id,
488 sn->sn_assoc_change.sac_state);
489 }
490 }
491}
492
493/* Data received from remote end */
494static int receive_from_sock(void)
495{
496 int ret = 0;
497 struct msghdr msg;
498 struct kvec iov[2];
499 unsigned len;
500 int r;
501 struct sctp_sndrcvinfo *sinfo;
502 struct cmsghdr *cmsg;
503 struct nodeinfo *ni;
504
505 /* These two are marginally too big for stack allocation, but this
506 * function is (currently) only called by dlm_recvd so static should be
507 * OK.
508 */
509 static struct sockaddr_storage msgname;
510 static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
511
512 if (sctp_con.sock == NULL)
513 goto out;
514
515 if (sctp_con.rx_page == NULL) {
516 /*
517 * This doesn't need to be atomic, but I think it should
518 * improve performance if it is.
519 */
520 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
521 if (sctp_con.rx_page == NULL)
522 goto out_resched;
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000523 cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
David Teiglande7fd4172006-01-18 09:30:29 +0000524 }
525
526 memset(&incmsg, 0, sizeof(incmsg));
527 memset(&msgname, 0, sizeof(msgname));
528
David Teiglande7fd4172006-01-18 09:30:29 +0000529 msg.msg_name = &msgname;
530 msg.msg_namelen = sizeof(msgname);
531 msg.msg_flags = 0;
532 msg.msg_control = incmsg;
533 msg.msg_controllen = sizeof(incmsg);
Patrick Caulfield42fb0082006-10-13 17:12:05 +0100534 msg.msg_iovlen = 1;
David Teiglande7fd4172006-01-18 09:30:29 +0000535
536 /* I don't see why this circular buffer stuff is necessary for SCTP
537 * which is a packet-based protocol, but the whole thing breaks under
538 * load without it! The overhead is minimal (and is in the TCP lowcomms
539 * anyway, of course) so I'll leave it in until I can figure out what's
540 * really happening.
541 */
542
543 /*
544 * iov[0] is the bit of the circular buffer between the current end
545 * point (cb.base + cb.len) and the end of the buffer.
546 */
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000547 iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
David Teiglande7fd4172006-01-18 09:30:29 +0000548 iov[0].iov_base = page_address(sctp_con.rx_page) +
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000549 cbuf_data(&sctp_con.cb);
David Teiglande7fd4172006-01-18 09:30:29 +0000550 iov[1].iov_len = 0;
551
552 /*
553 * iov[1] is the bit of the circular buffer between the start of the
554 * buffer and the start of the currently used section (cb.base)
555 */
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000556 if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
557 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
David Teiglande7fd4172006-01-18 09:30:29 +0000558 iov[1].iov_len = sctp_con.cb.base;
559 iov[1].iov_base = page_address(sctp_con.rx_page);
560 msg.msg_iovlen = 2;
561 }
562 len = iov[0].iov_len + iov[1].iov_len;
563
Patrick Caulfield4c5e1b12006-10-12 10:41:22 +0100564 r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
David Teiglande7fd4172006-01-18 09:30:29 +0000565 MSG_NOSIGNAL | MSG_DONTWAIT);
566 if (ret <= 0)
567 goto out_close;
568
569 msg.msg_control = incmsg;
570 msg.msg_controllen = sizeof(incmsg);
571 cmsg = CMSG_FIRSTHDR(&msg);
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000572 sinfo = CMSG_DATA(cmsg);
David Teiglande7fd4172006-01-18 09:30:29 +0000573
574 if (msg.msg_flags & MSG_NOTIFICATION) {
575 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
576 return 0;
577 }
578
579 /* Is this a new association ? */
580 ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
581 if (ni) {
582 ni->assoc_id = sinfo->sinfo_assoc_id;
583 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
584
585 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
586 spin_lock_bh(&write_nodes_lock);
587 list_add_tail(&ni->write_list, &write_nodes);
588 spin_unlock_bh(&write_nodes_lock);
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000589 queue_work(send_workqueue, &ni->swork);
David Teiglande7fd4172006-01-18 09:30:29 +0000590 }
David Teiglande7fd4172006-01-18 09:30:29 +0000591 }
592 }
593
594 /* INIT sends a message with length of 1 - ignore it */
595 if (r == 1)
596 return 0;
597
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000598 cbuf_add(&sctp_con.cb, ret);
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000599 // PJC: TODO: Add to node's workqueue....can we ??
David Teiglande7fd4172006-01-18 09:30:29 +0000600 ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
601 page_address(sctp_con.rx_page),
602 sctp_con.cb.base, sctp_con.cb.len,
603 PAGE_CACHE_SIZE);
604 if (ret < 0)
605 goto out_close;
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000606 cbuf_eat(&sctp_con.cb, ret);
David Teiglande7fd4172006-01-18 09:30:29 +0000607
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000608out:
David Teiglande7fd4172006-01-18 09:30:29 +0000609 ret = 0;
610 goto out_ret;
611
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000612out_resched:
David Teiglande7fd4172006-01-18 09:30:29 +0000613 lowcomms_data_ready(sctp_con.sock->sk, 0);
614 ret = 0;
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000615 cond_resched();
David Teiglande7fd4172006-01-18 09:30:29 +0000616 goto out_ret;
617
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000618out_close:
David Teiglande7fd4172006-01-18 09:30:29 +0000619 if (ret != -EAGAIN)
620 log_print("error reading from sctp socket: %d", ret);
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000621out_ret:
David Teiglande7fd4172006-01-18 09:30:29 +0000622 return ret;
623}
624
625/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
626static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
627{
628 mm_segment_t fs;
629 int result = 0;
630
631 fs = get_fs();
632 set_fs(get_ds());
633 if (num == 1)
634 result = sctp_con.sock->ops->bind(sctp_con.sock,
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000635 (struct sockaddr *) addr,
636 addr_len);
David Teiglande7fd4172006-01-18 09:30:29 +0000637 else
638 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000639 SCTP_SOCKOPT_BINDX_ADD,
640 (char *)addr, addr_len);
David Teiglande7fd4172006-01-18 09:30:29 +0000641 set_fs(fs);
642
643 if (result < 0)
644 log_print("Can't bind to port %d addr number %d",
David Teigland68c817a2007-01-09 09:41:48 -0600645 dlm_config.ci_tcp_port, num);
David Teiglande7fd4172006-01-18 09:30:29 +0000646
647 return result;
648}
649
650static void init_local(void)
651{
652 struct sockaddr_storage sas, *addr;
653 int i;
654
Steven Whitehouse47c962982006-05-25 17:43:14 -0400655 dlm_local_nodeid = dlm_our_nodeid();
David Teiglande7fd4172006-01-18 09:30:29 +0000656
657 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
658 if (dlm_our_addr(&sas, i))
659 break;
660
661 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
662 if (!addr)
663 break;
664 memcpy(addr, &sas, sizeof(*addr));
Steven Whitehouse47c962982006-05-25 17:43:14 -0400665 dlm_local_addr[dlm_local_count++] = addr;
David Teiglande7fd4172006-01-18 09:30:29 +0000666 }
667}
668
669/* Initialise SCTP socket and bind to all interfaces */
670static int init_sock(void)
671{
672 mm_segment_t fs;
673 struct socket *sock = NULL;
674 struct sockaddr_storage localaddr;
675 struct sctp_event_subscribe subscribe;
676 int result = -EINVAL, num = 1, i, addr_len;
677
Steven Whitehouse47c962982006-05-25 17:43:14 -0400678 if (!dlm_local_count) {
David Teiglande7fd4172006-01-18 09:30:29 +0000679 init_local();
Steven Whitehouse47c962982006-05-25 17:43:14 -0400680 if (!dlm_local_count) {
David Teiglande7fd4172006-01-18 09:30:29 +0000681 log_print("no local IP address has been set");
682 goto out;
683 }
684 }
685
Steven Whitehouse47c962982006-05-25 17:43:14 -0400686 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
David Teiglande7fd4172006-01-18 09:30:29 +0000687 IPPROTO_SCTP, &sock);
688 if (result < 0) {
689 log_print("Can't create comms socket, check SCTP is loaded");
690 goto out;
691 }
692
693 /* Listen for events */
694 memset(&subscribe, 0, sizeof(subscribe));
695 subscribe.sctp_data_io_event = 1;
696 subscribe.sctp_association_event = 1;
697 subscribe.sctp_send_failure_event = 1;
698 subscribe.sctp_shutdown_event = 1;
699 subscribe.sctp_partial_delivery_event = 1;
700
701 fs = get_fs();
702 set_fs(get_ds());
703 result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
704 (char *)&subscribe, sizeof(subscribe));
705 set_fs(fs);
706
707 if (result < 0) {
708 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
709 result);
710 goto create_delsock;
711 }
712
713 /* Init con struct */
714 sock->sk->sk_user_data = &sctp_con;
715 sctp_con.sock = sock;
716 sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
717
718 /* Bind to all interfaces. */
Steven Whitehouse47c962982006-05-25 17:43:14 -0400719 for (i = 0; i < dlm_local_count; i++) {
720 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
David Teigland68c817a2007-01-09 09:41:48 -0600721 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
David Teiglande7fd4172006-01-18 09:30:29 +0000722
723 result = add_bind_addr(&localaddr, addr_len, num);
724 if (result)
725 goto create_delsock;
726 ++num;
727 }
728
729 result = sock->ops->listen(sock, 5);
730 if (result < 0) {
731 log_print("Can't set socket listening");
732 goto create_delsock;
733 }
734
735 return 0;
736
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000737create_delsock:
David Teiglande7fd4172006-01-18 09:30:29 +0000738 sock_release(sock);
739 sctp_con.sock = NULL;
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000740out:
David Teiglande7fd4172006-01-18 09:30:29 +0000741 return result;
742}
743
744
Al Viro38d6fd22006-10-09 20:27:30 +0100745static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
David Teiglande7fd4172006-01-18 09:30:29 +0000746{
747 struct writequeue_entry *entry;
748
749 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
750 if (!entry)
751 return NULL;
752
753 entry->page = alloc_page(allocation);
754 if (!entry->page) {
755 kfree(entry);
756 return NULL;
757 }
758
759 entry->offset = 0;
760 entry->len = 0;
761 entry->end = 0;
762 entry->users = 0;
763
764 return entry;
765}
766
Al Viro38d6fd22006-10-09 20:27:30 +0100767void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
David Teiglande7fd4172006-01-18 09:30:29 +0000768{
769 struct writequeue_entry *e;
770 int offset = 0;
771 int users = 0;
772 struct nodeinfo *ni;
773
David Teiglande7fd4172006-01-18 09:30:29 +0000774 ni = nodeid2nodeinfo(nodeid, allocation);
775 if (!ni)
776 return NULL;
777
778 spin_lock(&ni->writequeue_lock);
779 e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000780 if ((&e->list == &ni->writequeue) ||
David Teiglande7fd4172006-01-18 09:30:29 +0000781 (PAGE_CACHE_SIZE - e->end < len)) {
782 e = NULL;
783 } else {
784 offset = e->end;
785 e->end += len;
786 users = e->users++;
787 }
788 spin_unlock(&ni->writequeue_lock);
789
790 if (e) {
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000791 got_one:
David Teiglande7fd4172006-01-18 09:30:29 +0000792 if (users == 0)
793 kmap(e->page);
794 *ppc = page_address(e->page) + offset;
795 return e;
796 }
797
798 e = new_writequeue_entry(allocation);
799 if (e) {
800 spin_lock(&ni->writequeue_lock);
801 offset = e->end;
802 e->end += len;
803 e->ni = ni;
804 users = e->users++;
805 list_add_tail(&e->list, &ni->writequeue);
806 spin_unlock(&ni->writequeue_lock);
807 goto got_one;
808 }
809 return NULL;
810}
811
812void dlm_lowcomms_commit_buffer(void *arg)
813{
814 struct writequeue_entry *e = (struct writequeue_entry *) arg;
815 int users;
816 struct nodeinfo *ni = e->ni;
817
David Teiglande7fd4172006-01-18 09:30:29 +0000818 spin_lock(&ni->writequeue_lock);
819 users = --e->users;
820 if (users)
821 goto out;
822 e->len = e->end - e->offset;
823 kunmap(e->page);
824 spin_unlock(&ni->writequeue_lock);
825
826 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
827 spin_lock_bh(&write_nodes_lock);
828 list_add_tail(&ni->write_list, &write_nodes);
829 spin_unlock_bh(&write_nodes_lock);
Patrick Caulfield1d6e8132007-01-15 14:33:34 +0000830
831 queue_work(send_workqueue, &ni->swork);
David Teiglande7fd4172006-01-18 09:30:29 +0000832 }
833 return;
834
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000835out:
David Teiglande7fd4172006-01-18 09:30:29 +0000836 spin_unlock(&ni->writequeue_lock);
837 return;
838}
839
840static void free_entry(struct writequeue_entry *e)
841{
842 __free_page(e->page);
843 kfree(e);
844}
845
846/* Initiate an SCTP association. In theory we could just use sendmsg() on
847 the first IP address and it should work, but this allows us to set up the
848 association before sending any valuable data that we can't afford to lose.
849 It also keeps the send path clean as it can now always use the association ID */
850static void initiate_association(int nodeid)
851{
852 struct sockaddr_storage rem_addr;
853 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
854 struct msghdr outmessage;
855 struct cmsghdr *cmsg;
856 struct sctp_sndrcvinfo *sinfo;
857 int ret;
858 int addrlen;
859 char buf[1];
860 struct kvec iov[1];
861 struct nodeinfo *ni;
862
863 log_print("Initiating association with node %d", nodeid);
864
865 ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
866 if (!ni)
867 return;
868
869 if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
870 log_print("no address for nodeid %d", nodeid);
871 return;
872 }
873
David Teigland68c817a2007-01-09 09:41:48 -0600874 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
David Teiglande7fd4172006-01-18 09:30:29 +0000875
876 outmessage.msg_name = &rem_addr;
877 outmessage.msg_namelen = addrlen;
878 outmessage.msg_control = outcmsg;
879 outmessage.msg_controllen = sizeof(outcmsg);
880 outmessage.msg_flags = MSG_EOR;
881
882 iov[0].iov_base = buf;
883 iov[0].iov_len = 1;
884
885 /* Real INIT messages seem to cause trouble. Just send a 1 byte message
886 we can afford to lose */
887 cmsg = CMSG_FIRSTHDR(&outmessage);
888 cmsg->cmsg_level = IPPROTO_SCTP;
889 cmsg->cmsg_type = SCTP_SNDRCV;
890 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000891 sinfo = CMSG_DATA(cmsg);
David Teiglande7fd4172006-01-18 09:30:29 +0000892 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
Steven Whitehouse47c962982006-05-25 17:43:14 -0400893 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
David Teiglande7fd4172006-01-18 09:30:29 +0000894
895 outmessage.msg_controllen = cmsg->cmsg_len;
896 ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
897 if (ret < 0) {
898 log_print("send INIT to node failed: %d", ret);
899 /* Try again later */
900 clear_bit(NI_INIT_PENDING, &ni->flags);
901 }
902}
903
904/* Send a message */
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000905static void send_to_sock(struct nodeinfo *ni)
David Teiglande7fd4172006-01-18 09:30:29 +0000906{
907 int ret = 0;
908 struct writequeue_entry *e;
909 int len, offset;
910 struct msghdr outmsg;
911 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
912 struct cmsghdr *cmsg;
913 struct sctp_sndrcvinfo *sinfo;
914 struct kvec iov;
915
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000916 /* See if we need to init an association before we start
David Teiglande7fd4172006-01-18 09:30:29 +0000917 sending precious messages */
918 spin_lock(&ni->lock);
919 if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
920 spin_unlock(&ni->lock);
921 initiate_association(ni->nodeid);
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000922 return;
David Teiglande7fd4172006-01-18 09:30:29 +0000923 }
924 spin_unlock(&ni->lock);
925
926 outmsg.msg_name = NULL; /* We use assoc_id */
927 outmsg.msg_namelen = 0;
928 outmsg.msg_control = outcmsg;
929 outmsg.msg_controllen = sizeof(outcmsg);
930 outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
931
932 cmsg = CMSG_FIRSTHDR(&outmsg);
933 cmsg->cmsg_level = IPPROTO_SCTP;
934 cmsg->cmsg_type = SCTP_SNDRCV;
935 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000936 sinfo = CMSG_DATA(cmsg);
David Teiglande7fd4172006-01-18 09:30:29 +0000937 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
Steven Whitehouse47c962982006-05-25 17:43:14 -0400938 sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
David Teiglande7fd4172006-01-18 09:30:29 +0000939 sinfo->sinfo_assoc_id = ni->assoc_id;
940 outmsg.msg_controllen = cmsg->cmsg_len;
941
942 spin_lock(&ni->writequeue_lock);
943 for (;;) {
944 if (list_empty(&ni->writequeue))
945 break;
946 e = list_entry(ni->writequeue.next, struct writequeue_entry,
947 list);
David Teiglande7fd4172006-01-18 09:30:29 +0000948 len = e->len;
949 offset = e->offset;
950 BUG_ON(len == 0 && e->users == 0);
951 spin_unlock(&ni->writequeue_lock);
David Teiglandfcc8abc2006-08-10 13:31:23 -0500952 kmap(e->page);
David Teiglande7fd4172006-01-18 09:30:29 +0000953
954 ret = 0;
955 if (len) {
956 iov.iov_base = page_address(e->page)+offset;
957 iov.iov_len = len;
958
959 ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
960 len);
961 if (ret == -EAGAIN) {
962 sctp_con.eagain_flag = 1;
963 goto out;
964 } else if (ret < 0)
965 goto send_error;
966 } else {
967 /* Don't starve people filling buffers */
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000968 cond_resched();
David Teiglande7fd4172006-01-18 09:30:29 +0000969 }
970
971 spin_lock(&ni->writequeue_lock);
972 e->offset += ret;
973 e->len -= ret;
974
975 if (e->len == 0 && e->users == 0) {
976 list_del(&e->list);
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000977 kunmap(e->page);
David Teiglande7fd4172006-01-18 09:30:29 +0000978 free_entry(e);
979 continue;
980 }
981 }
982 spin_unlock(&ni->writequeue_lock);
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000983out:
984 return;
David Teiglande7fd4172006-01-18 09:30:29 +0000985
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000986send_error:
David Teiglande7fd4172006-01-18 09:30:29 +0000987 log_print("Error sending to node %d %d", ni->nodeid, ret);
988 spin_lock(&ni->lock);
989 if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
990 ni->assoc_id = 0;
991 spin_unlock(&ni->lock);
992 initiate_association(ni->nodeid);
993 } else
994 spin_unlock(&ni->lock);
995
Patrick Caulfieldac33d072006-12-06 15:10:37 +0000996 return;
David Teiglande7fd4172006-01-18 09:30:29 +0000997}
998
999/* Try to send any messages that are pending */
1000static void process_output_queue(void)
1001{
1002 struct list_head *list;
1003 struct list_head *temp;
1004
1005 spin_lock_bh(&write_nodes_lock);
1006 list_for_each_safe(list, temp, &write_nodes) {
1007 struct nodeinfo *ni =
Patrick Caulfieldac33d072006-12-06 15:10:37 +00001008 list_entry(list, struct nodeinfo, write_list);
David Teiglande7fd4172006-01-18 09:30:29 +00001009 clear_bit(NI_WRITE_PENDING, &ni->flags);
1010 list_del(&ni->write_list);
1011
1012 spin_unlock_bh(&write_nodes_lock);
1013
1014 send_to_sock(ni);
1015 spin_lock_bh(&write_nodes_lock);
1016 }
1017 spin_unlock_bh(&write_nodes_lock);
1018}
1019
1020/* Called after we've had -EAGAIN and been woken up */
1021static void refill_write_queue(void)
1022{
1023 int i;
1024
1025 for (i=1; i<=max_nodeid; i++) {
1026 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1027
1028 if (ni) {
1029 if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
1030 spin_lock_bh(&write_nodes_lock);
1031 list_add_tail(&ni->write_list, &write_nodes);
1032 spin_unlock_bh(&write_nodes_lock);
1033 }
1034 }
1035 }
1036}
1037
1038static void clean_one_writequeue(struct nodeinfo *ni)
1039{
1040 struct list_head *list;
1041 struct list_head *temp;
1042
1043 spin_lock(&ni->writequeue_lock);
1044 list_for_each_safe(list, temp, &ni->writequeue) {
1045 struct writequeue_entry *e =
1046 list_entry(list, struct writequeue_entry, list);
1047 list_del(&e->list);
1048 free_entry(e);
1049 }
1050 spin_unlock(&ni->writequeue_lock);
1051}
1052
1053static void clean_writequeues(void)
1054{
1055 int i;
1056
1057 for (i=1; i<=max_nodeid; i++) {
1058 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1059 if (ni)
1060 clean_one_writequeue(ni);
1061 }
1062}
1063
1064
1065static void dealloc_nodeinfo(void)
1066{
1067 int i;
1068
1069 for (i=1; i<=max_nodeid; i++) {
1070 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1071 if (ni) {
1072 idr_remove(&nodeinfo_idr, i);
1073 kfree(ni);
1074 }
1075 }
1076}
1077
David Teigland1c032c02006-04-28 10:50:41 -04001078int dlm_lowcomms_close(int nodeid)
1079{
1080 struct nodeinfo *ni;
1081
1082 ni = nodeid2nodeinfo(nodeid, 0);
1083 if (!ni)
1084 return -1;
1085
1086 spin_lock(&ni->lock);
1087 if (ni->assoc_id) {
1088 ni->assoc_id = 0;
1089 /* Don't send shutdown here, sctp will just queue it
1090 till the node comes back up! */
1091 }
1092 spin_unlock(&ni->lock);
1093
1094 clean_one_writequeue(ni);
1095 clear_bit(NI_INIT_PENDING, &ni->flags);
1096 return 0;
1097}
1098
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001099// PJC: The work queue function for receiving.
1100static void process_recv_sockets(struct work_struct *work)
David Teiglande7fd4172006-01-18 09:30:29 +00001101{
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001102 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1103 int ret;
David Teiglande7fd4172006-01-18 09:30:29 +00001104 int count = 0;
1105
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001106 do {
1107 ret = receive_from_sock();
David Teiglande7fd4172006-01-18 09:30:29 +00001108
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001109 /* Don't starve out everyone else */
1110 if (++count >= MAX_RX_MSG_COUNT) {
1111 cond_resched();
1112 count = 0;
1113 }
1114 } while (!kthread_should_stop() && ret >=0);
David Teiglande7fd4172006-01-18 09:30:29 +00001115 }
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001116 cond_resched();
David Teiglande7fd4172006-01-18 09:30:29 +00001117}
1118
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001119// PJC: the work queue function for sending
1120static void process_send_sockets(struct work_struct *work)
David Teiglande7fd4172006-01-18 09:30:29 +00001121{
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001122 if (sctp_con.eagain_flag) {
1123 sctp_con.eagain_flag = 0;
1124 refill_write_queue();
David Teiglande7fd4172006-01-18 09:30:29 +00001125 }
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001126 process_output_queue();
1127}
David Teiglande7fd4172006-01-18 09:30:29 +00001128
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001129// PJC: Process lock requests from a particular node.
1130// TODO: can we optimise this out on UP ??
1131static void process_lock_request(struct work_struct *work)
1132{
David Teiglande7fd4172006-01-18 09:30:29 +00001133}
1134
1135static void daemons_stop(void)
1136{
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001137 destroy_workqueue(recv_workqueue);
1138 destroy_workqueue(send_workqueue);
1139 destroy_workqueue(lock_workqueue);
David Teiglande7fd4172006-01-18 09:30:29 +00001140}
1141
1142static int daemons_start(void)
1143{
David Teiglande7fd4172006-01-18 09:30:29 +00001144 int error;
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001145 recv_workqueue = create_workqueue("dlm_recv");
1146 error = IS_ERR(recv_workqueue);
Patrick Caulfieldac33d072006-12-06 15:10:37 +00001147 if (error) {
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001148 log_print("can't start dlm_recv %d", error);
David Teiglande7fd4172006-01-18 09:30:29 +00001149 return error;
1150 }
David Teiglande7fd4172006-01-18 09:30:29 +00001151
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001152 send_workqueue = create_singlethread_workqueue("dlm_send");
1153 error = IS_ERR(send_workqueue);
Patrick Caulfieldac33d072006-12-06 15:10:37 +00001154 if (error) {
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001155 log_print("can't start dlm_send %d", error);
1156 destroy_workqueue(recv_workqueue);
David Teiglande7fd4172006-01-18 09:30:29 +00001157 return error;
1158 }
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001159
1160 lock_workqueue = create_workqueue("dlm_rlock");
1161 error = IS_ERR(lock_workqueue);
1162 if (error) {
1163 log_print("can't start dlm_rlock %d", error);
1164 destroy_workqueue(send_workqueue);
1165 destroy_workqueue(recv_workqueue);
1166 return error;
1167 }
David Teiglande7fd4172006-01-18 09:30:29 +00001168
1169 return 0;
1170}
1171
1172/*
1173 * This is quite likely to sleep...
1174 */
1175int dlm_lowcomms_start(void)
1176{
1177 int error;
1178
Patrick Caulfield1d6e8132007-01-15 14:33:34 +00001179 INIT_WORK(&sctp_con.work, process_recv_sockets);
1180
David Teiglande7fd4172006-01-18 09:30:29 +00001181 error = init_sock();
1182 if (error)
1183 goto fail_sock;
1184 error = daemons_start();
1185 if (error)
1186 goto fail_sock;
David Teiglande7fd4172006-01-18 09:30:29 +00001187 return 0;
1188
Patrick Caulfieldac33d072006-12-06 15:10:37 +00001189fail_sock:
David Teiglande7fd4172006-01-18 09:30:29 +00001190 close_connection();
1191 return error;
1192}
1193
David Teiglande7fd4172006-01-18 09:30:29 +00001194void dlm_lowcomms_stop(void)
1195{
Patrick Caulfieldac33d072006-12-06 15:10:37 +00001196 int i;
1197
David Teiglande7fd4172006-01-18 09:30:29 +00001198 sctp_con.flags = 0x7;
1199 daemons_stop();
1200 clean_writequeues();
1201 close_connection();
1202 dealloc_nodeinfo();
1203 max_nodeid = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001204
Patrick Caulfieldac33d072006-12-06 15:10:37 +00001205 dlm_local_count = 0;
1206 dlm_local_nodeid = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001207
Steven Whitehouse47c962982006-05-25 17:43:14 -04001208 for (i = 0; i < dlm_local_count; i++)
1209 kfree(dlm_local_addr[i]);
David Teiglande7fd4172006-01-18 09:30:29 +00001210}