blob: 80b8cce9cf3c7ddd90fca506f67067187a216006 [file] [log] [blame]
Kurt Hackel6714d8e2005-12-15 14:31:23 -08001/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmdomain.c
5 *
6 * defines domain join / leave apis
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27#include <linux/module.h>
28#include <linux/types.h>
29#include <linux/slab.h>
30#include <linux/highmem.h>
31#include <linux/utsname.h>
32#include <linux/init.h>
33#include <linux/spinlock.h>
34#include <linux/delay.h>
35#include <linux/err.h>
36
37#include "cluster/heartbeat.h"
38#include "cluster/nodemanager.h"
39#include "cluster/tcp.h"
40
41#include "dlmapi.h"
42#include "dlmcommon.h"
43
44#include "dlmdebug.h"
45#include "dlmdomain.h"
46
47#include "dlmver.h"
48
49#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
50#include "cluster/masklog.h"
51
Daniel Phillips03d864c2006-03-10 18:08:16 -080052static void dlm_free_pagevec(void **vec, int pages)
53{
54 while (pages--)
55 free_page((unsigned long)vec[pages]);
56 kfree(vec);
57}
58
59static void **dlm_alloc_pagevec(int pages)
60{
61 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
62 int i;
63
64 if (!vec)
65 return NULL;
66
67 for (i = 0; i < pages; i++)
68 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
69 goto out_free;
70 return vec;
71out_free:
72 dlm_free_pagevec(vec, i);
73 return NULL;
74}
75
Kurt Hackel6714d8e2005-12-15 14:31:23 -080076/*
77 *
78 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
79 * dlm_domain_lock
80 * struct dlm_ctxt->spinlock
81 * struct dlm_lock_resource->spinlock
82 * struct dlm_ctxt->master_lock
83 * struct dlm_ctxt->ast_lock
84 * dlm_master_list_entry->spinlock
85 * dlm_lock->spinlock
86 *
87 */
88
89spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
90LIST_HEAD(dlm_domains);
91static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
92
93#define DLM_DOMAIN_BACKOFF_MS 200
94
95static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data);
96static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data);
97static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data);
98static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data);
99
100static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
101
102void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
103{
Mark Fasheh81f20942006-02-28 17:31:22 -0800104 hlist_del_init(&lockres->hash_node);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800105 dlm_lockres_put(lockres);
106}
107
108void __dlm_insert_lockres(struct dlm_ctxt *dlm,
109 struct dlm_lock_resource *res)
110{
Mark Fasheh81f20942006-02-28 17:31:22 -0800111 struct hlist_head *bucket;
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800112 struct qstr *q;
113
114 assert_spin_locked(&dlm->spinlock);
115
116 q = &res->lockname;
Daniel Phillips03d864c2006-03-10 18:08:16 -0800117 bucket = dlm_lockres_hash(dlm, q->hash);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800118
119 /* get a reference for our hashtable */
120 dlm_lockres_get(res);
121
Mark Fasheh81f20942006-02-28 17:31:22 -0800122 hlist_add_head(&res->hash_node, bucket);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800123}
124
125struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
Mark Fasheha3d33292006-03-09 17:55:56 -0800126 const char *name,
127 unsigned int len,
128 unsigned int hash)
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800129{
Mark Fasheh81f20942006-02-28 17:31:22 -0800130 struct hlist_head *bucket;
Daniel Phillips41989852006-03-10 13:31:47 -0800131 struct hlist_node *list;
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800132
133 mlog_entry("%.*s\n", len, name);
134
135 assert_spin_locked(&dlm->spinlock);
136
Daniel Phillips03d864c2006-03-10 18:08:16 -0800137 bucket = dlm_lockres_hash(dlm, hash);
138
Daniel Phillips41989852006-03-10 13:31:47 -0800139 hlist_for_each(list, bucket) {
140 struct dlm_lock_resource *res = hlist_entry(list,
141 struct dlm_lock_resource, hash_node);
142 if (res->lockname.name[0] != name[0])
143 continue;
144 if (unlikely(res->lockname.len != len))
145 continue;
146 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
147 continue;
148 dlm_lockres_get(res);
149 return res;
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800150 }
Daniel Phillips41989852006-03-10 13:31:47 -0800151 return NULL;
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800152}
153
154struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
155 const char *name,
156 unsigned int len)
157{
158 struct dlm_lock_resource *res;
Mark Fasheha3d33292006-03-09 17:55:56 -0800159 unsigned int hash = dlm_lockid_hash(name, len);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800160
161 spin_lock(&dlm->spinlock);
Mark Fasheha3d33292006-03-09 17:55:56 -0800162 res = __dlm_lookup_lockres(dlm, name, len, hash);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800163 spin_unlock(&dlm->spinlock);
164 return res;
165}
166
167static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
168{
169 struct dlm_ctxt *tmp = NULL;
170 struct list_head *iter;
171
172 assert_spin_locked(&dlm_domain_lock);
173
174 /* tmp->name here is always NULL terminated,
175 * but domain may not be! */
176 list_for_each(iter, &dlm_domains) {
177 tmp = list_entry (iter, struct dlm_ctxt, list);
178 if (strlen(tmp->name) == len &&
179 memcmp(tmp->name, domain, len)==0)
180 break;
181 tmp = NULL;
182 }
183
184 return tmp;
185}
186
187/* For null terminated domain strings ONLY */
188static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
189{
190 assert_spin_locked(&dlm_domain_lock);
191
192 return __dlm_lookup_domain_full(domain, strlen(domain));
193}
194
195
196/* returns true on one of two conditions:
197 * 1) the domain does not exist
198 * 2) the domain exists and it's state is "joined" */
199static int dlm_wait_on_domain_helper(const char *domain)
200{
201 int ret = 0;
202 struct dlm_ctxt *tmp = NULL;
203
204 spin_lock(&dlm_domain_lock);
205
206 tmp = __dlm_lookup_domain(domain);
207 if (!tmp)
208 ret = 1;
209 else if (tmp->dlm_state == DLM_CTXT_JOINED)
210 ret = 1;
211
212 spin_unlock(&dlm_domain_lock);
213 return ret;
214}
215
216static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
217{
Mark Fasheh81f20942006-02-28 17:31:22 -0800218 if (dlm->lockres_hash)
Daniel Phillips03d864c2006-03-10 18:08:16 -0800219 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800220
221 if (dlm->name)
222 kfree(dlm->name);
223
224 kfree(dlm);
225}
226
227/* A little strange - this function will be called while holding
228 * dlm_domain_lock and is expected to be holding it on the way out. We
229 * will however drop and reacquire it multiple times */
230static void dlm_ctxt_release(struct kref *kref)
231{
232 struct dlm_ctxt *dlm;
233
234 dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
235
236 BUG_ON(dlm->num_joins);
237 BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
238
239 /* we may still be in the list if we hit an error during join. */
240 list_del_init(&dlm->list);
241
242 spin_unlock(&dlm_domain_lock);
243
244 mlog(0, "freeing memory from domain %s\n", dlm->name);
245
246 wake_up(&dlm_domain_events);
247
248 dlm_free_ctxt_mem(dlm);
249
250 spin_lock(&dlm_domain_lock);
251}
252
253void dlm_put(struct dlm_ctxt *dlm)
254{
255 spin_lock(&dlm_domain_lock);
256 kref_put(&dlm->dlm_refs, dlm_ctxt_release);
257 spin_unlock(&dlm_domain_lock);
258}
259
260static void __dlm_get(struct dlm_ctxt *dlm)
261{
262 kref_get(&dlm->dlm_refs);
263}
264
265/* given a questionable reference to a dlm object, gets a reference if
266 * it can find it in the list, otherwise returns NULL in which case
267 * you shouldn't trust your pointer. */
268struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
269{
270 struct list_head *iter;
271 struct dlm_ctxt *target = NULL;
272
273 spin_lock(&dlm_domain_lock);
274
275 list_for_each(iter, &dlm_domains) {
276 target = list_entry (iter, struct dlm_ctxt, list);
277
278 if (target == dlm) {
279 __dlm_get(target);
280 break;
281 }
282
283 target = NULL;
284 }
285
286 spin_unlock(&dlm_domain_lock);
287
288 return target;
289}
290
291int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
292{
293 int ret;
294
295 spin_lock(&dlm_domain_lock);
296 ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
297 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
298 spin_unlock(&dlm_domain_lock);
299
300 return ret;
301}
302
303static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
304{
305 dlm_unregister_domain_handlers(dlm);
306 dlm_complete_thread(dlm);
307 dlm_complete_recovery_thread(dlm);
308
309 /* We've left the domain. Now we can take ourselves out of the
310 * list and allow the kref stuff to help us free the
311 * memory. */
312 spin_lock(&dlm_domain_lock);
313 list_del_init(&dlm->list);
314 spin_unlock(&dlm_domain_lock);
315
316 /* Wake up anyone waiting for us to remove this domain */
317 wake_up(&dlm_domain_events);
318}
319
320static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
321{
322 int i;
323 struct dlm_lock_resource *res;
324
325 mlog(0, "Migrating locks from domain %s\n", dlm->name);
326restart:
327 spin_lock(&dlm->spinlock);
Mark Fasheh81f20942006-02-28 17:31:22 -0800328 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
Daniel Phillips03d864c2006-03-10 18:08:16 -0800329 while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
330 res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
Mark Fasheh81f20942006-02-28 17:31:22 -0800331 struct dlm_lock_resource, hash_node);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800332 /* need reference when manually grabbing lockres */
333 dlm_lockres_get(res);
334 /* this should unhash the lockres
335 * and exit with dlm->spinlock */
336 mlog(0, "purging res=%p\n", res);
337 if (dlm_lockres_is_dirty(dlm, res)) {
338 /* HACK! this should absolutely go.
339 * need to figure out why some empty
340 * lockreses are still marked dirty */
341 mlog(ML_ERROR, "lockres %.*s dirty!\n",
342 res->lockname.len, res->lockname.name);
343
344 spin_unlock(&dlm->spinlock);
345 dlm_kick_thread(dlm, res);
346 wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
347 dlm_lockres_put(res);
348 goto restart;
349 }
350 dlm_purge_lockres(dlm, res);
351 dlm_lockres_put(res);
352 }
353 }
354 spin_unlock(&dlm->spinlock);
355
356 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
357}
358
359static int dlm_no_joining_node(struct dlm_ctxt *dlm)
360{
361 int ret;
362
363 spin_lock(&dlm->spinlock);
364 ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
365 spin_unlock(&dlm->spinlock);
366
367 return ret;
368}
369
370static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
371{
372 /* Yikes, a double spinlock! I need domain_lock for the dlm
373 * state and the dlm spinlock for join state... Sorry! */
374again:
375 spin_lock(&dlm_domain_lock);
376 spin_lock(&dlm->spinlock);
377
378 if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
379 mlog(0, "Node %d is joining, we wait on it.\n",
380 dlm->joining_node);
381 spin_unlock(&dlm->spinlock);
382 spin_unlock(&dlm_domain_lock);
383
384 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
385 goto again;
386 }
387
388 dlm->dlm_state = DLM_CTXT_LEAVING;
389 spin_unlock(&dlm->spinlock);
390 spin_unlock(&dlm_domain_lock);
391}
392
393static void __dlm_print_nodes(struct dlm_ctxt *dlm)
394{
395 int node = -1;
396
397 assert_spin_locked(&dlm->spinlock);
398
399 mlog(ML_NOTICE, "Nodes in my domain (\"%s\"):\n", dlm->name);
400
401 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
402 node + 1)) < O2NM_MAX_NODES) {
403 mlog(ML_NOTICE, " node %d\n", node);
404 }
405}
406
407static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data)
408{
409 struct dlm_ctxt *dlm = data;
410 unsigned int node;
411 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
412
413 mlog_entry("%p %u %p", msg, len, data);
414
415 if (!dlm_grab(dlm))
416 return 0;
417
418 node = exit_msg->node_idx;
419
420 mlog(0, "Node %u leaves domain %s\n", node, dlm->name);
421
422 spin_lock(&dlm->spinlock);
423 clear_bit(node, dlm->domain_map);
424 __dlm_print_nodes(dlm);
425
426 /* notify anything attached to the heartbeat events */
427 dlm_hb_event_notify_attached(dlm, node, 0);
428
429 spin_unlock(&dlm->spinlock);
430
431 dlm_put(dlm);
432
433 return 0;
434}
435
436static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
437 unsigned int node)
438{
439 int status;
440 struct dlm_exit_domain leave_msg;
441
442 mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
443 node, dlm->name, dlm->node_num);
444
445 memset(&leave_msg, 0, sizeof(leave_msg));
446 leave_msg.node_idx = dlm->node_num;
447
448 status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
449 &leave_msg, sizeof(leave_msg), node,
450 NULL);
451
452 mlog(0, "status return %d from o2net_send_message\n", status);
453
454 return status;
455}
456
457
458static void dlm_leave_domain(struct dlm_ctxt *dlm)
459{
460 int node, clear_node, status;
461
462 /* At this point we've migrated away all our locks and won't
463 * accept mastership of new ones. The dlm is responsible for
464 * almost nothing now. We make sure not to confuse any joining
465 * nodes and then commence shutdown procedure. */
466
467 spin_lock(&dlm->spinlock);
468 /* Clear ourselves from the domain map */
469 clear_bit(dlm->node_num, dlm->domain_map);
470 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
471 0)) < O2NM_MAX_NODES) {
472 /* Drop the dlm spinlock. This is safe wrt the domain_map.
473 * -nodes cannot be added now as the
474 * query_join_handlers knows to respond with OK_NO_MAP
475 * -we catch the right network errors if a node is
476 * removed from the map while we're sending him the
477 * exit message. */
478 spin_unlock(&dlm->spinlock);
479
480 clear_node = 1;
481
482 status = dlm_send_one_domain_exit(dlm, node);
483 if (status < 0 &&
484 status != -ENOPROTOOPT &&
485 status != -ENOTCONN) {
486 mlog(ML_NOTICE, "Error %d sending domain exit message "
487 "to node %d\n", status, node);
488
489 /* Not sure what to do here but lets sleep for
490 * a bit in case this was a transient
491 * error... */
492 msleep(DLM_DOMAIN_BACKOFF_MS);
493 clear_node = 0;
494 }
495
496 spin_lock(&dlm->spinlock);
497 /* If we're not clearing the node bit then we intend
498 * to loop back around to try again. */
499 if (clear_node)
500 clear_bit(node, dlm->domain_map);
501 }
502 spin_unlock(&dlm->spinlock);
503}
504
505int dlm_joined(struct dlm_ctxt *dlm)
506{
507 int ret = 0;
508
509 spin_lock(&dlm_domain_lock);
510
511 if (dlm->dlm_state == DLM_CTXT_JOINED)
512 ret = 1;
513
514 spin_unlock(&dlm_domain_lock);
515
516 return ret;
517}
518
519int dlm_shutting_down(struct dlm_ctxt *dlm)
520{
521 int ret = 0;
522
523 spin_lock(&dlm_domain_lock);
524
525 if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
526 ret = 1;
527
528 spin_unlock(&dlm_domain_lock);
529
530 return ret;
531}
532
533void dlm_unregister_domain(struct dlm_ctxt *dlm)
534{
535 int leave = 0;
536
537 spin_lock(&dlm_domain_lock);
538 BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
539 BUG_ON(!dlm->num_joins);
540
541 dlm->num_joins--;
542 if (!dlm->num_joins) {
543 /* We mark it "in shutdown" now so new register
544 * requests wait until we've completely left the
545 * domain. Don't use DLM_CTXT_LEAVING yet as we still
546 * want new domain joins to communicate with us at
547 * least until we've completed migration of our
548 * resources. */
549 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
550 leave = 1;
551 }
552 spin_unlock(&dlm_domain_lock);
553
554 if (leave) {
555 mlog(0, "shutting down domain %s\n", dlm->name);
556
557 /* We changed dlm state, notify the thread */
558 dlm_kick_thread(dlm, NULL);
559
560 dlm_migrate_all_locks(dlm);
561 dlm_mark_domain_leaving(dlm);
562 dlm_leave_domain(dlm);
563 dlm_complete_dlm_shutdown(dlm);
564 }
565 dlm_put(dlm);
566}
567EXPORT_SYMBOL_GPL(dlm_unregister_domain);
568
569static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
570{
571 struct dlm_query_join_request *query;
572 enum dlm_query_join_response response;
573 struct dlm_ctxt *dlm = NULL;
574
575 query = (struct dlm_query_join_request *) msg->buf;
576
577 mlog(0, "node %u wants to join domain %s\n", query->node_idx,
578 query->domain);
579
580 /*
581 * If heartbeat doesn't consider the node live, tell it
582 * to back off and try again. This gives heartbeat a chance
583 * to catch up.
584 */
585 if (!o2hb_check_node_heartbeating(query->node_idx)) {
586 mlog(0, "node %u is not in our live map yet\n",
587 query->node_idx);
588
589 response = JOIN_DISALLOW;
590 goto respond;
591 }
592
593 response = JOIN_OK_NO_MAP;
594
595 spin_lock(&dlm_domain_lock);
596 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
597 /* Once the dlm ctxt is marked as leaving then we don't want
Kurt Hackele2faea42006-01-12 14:24:55 -0800598 * to be put in someone's domain map.
599 * Also, explicitly disallow joining at certain troublesome
600 * times (ie. during recovery). */
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800601 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
Kurt Hackele2faea42006-01-12 14:24:55 -0800602 int bit = query->node_idx;
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800603 spin_lock(&dlm->spinlock);
604
605 if (dlm->dlm_state == DLM_CTXT_NEW &&
606 dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
607 /*If this is a brand new context and we
608 * haven't started our join process yet, then
609 * the other node won the race. */
610 response = JOIN_OK_NO_MAP;
611 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
612 /* Disallow parallel joins. */
613 response = JOIN_DISALLOW;
Kurt Hackele2faea42006-01-12 14:24:55 -0800614 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
615 mlog(ML_NOTICE, "node %u trying to join, but recovery "
616 "is ongoing.\n", bit);
617 response = JOIN_DISALLOW;
618 } else if (test_bit(bit, dlm->recovery_map)) {
619 mlog(ML_NOTICE, "node %u trying to join, but it "
620 "still needs recovery.\n", bit);
621 response = JOIN_DISALLOW;
622 } else if (test_bit(bit, dlm->domain_map)) {
623 mlog(ML_NOTICE, "node %u trying to join, but it "
624 "is still in the domain! needs recovery?\n",
625 bit);
626 response = JOIN_DISALLOW;
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800627 } else {
628 /* Alright we're fully a part of this domain
629 * so we keep some state as to who's joining
630 * and indicate to him that needs to be fixed
631 * up. */
632 response = JOIN_OK;
633 __dlm_set_joining_node(dlm, query->node_idx);
634 }
635
636 spin_unlock(&dlm->spinlock);
637 }
638 spin_unlock(&dlm_domain_lock);
639
640respond:
641 mlog(0, "We respond with %u\n", response);
642
643 return response;
644}
645
646static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
647{
648 struct dlm_assert_joined *assert;
649 struct dlm_ctxt *dlm = NULL;
650
651 assert = (struct dlm_assert_joined *) msg->buf;
652
653 mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
654 assert->domain);
655
656 spin_lock(&dlm_domain_lock);
657 dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
658 /* XXX should we consider no dlm ctxt an error? */
659 if (dlm) {
660 spin_lock(&dlm->spinlock);
661
662 /* Alright, this node has officially joined our
663 * domain. Set him in the map and clean up our
664 * leftover join state. */
665 BUG_ON(dlm->joining_node != assert->node_idx);
666 set_bit(assert->node_idx, dlm->domain_map);
667 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
668
669 __dlm_print_nodes(dlm);
670
671 /* notify anything attached to the heartbeat events */
672 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
673
674 spin_unlock(&dlm->spinlock);
675 }
676 spin_unlock(&dlm_domain_lock);
677
678 return 0;
679}
680
681static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data)
682{
683 struct dlm_cancel_join *cancel;
684 struct dlm_ctxt *dlm = NULL;
685
686 cancel = (struct dlm_cancel_join *) msg->buf;
687
688 mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
689 cancel->domain);
690
691 spin_lock(&dlm_domain_lock);
692 dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
693
694 if (dlm) {
695 spin_lock(&dlm->spinlock);
696
697 /* Yikes, this guy wants to cancel his join. No
698 * problem, we simply cleanup our join state. */
699 BUG_ON(dlm->joining_node != cancel->node_idx);
700 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
701
702 spin_unlock(&dlm->spinlock);
703 }
704 spin_unlock(&dlm_domain_lock);
705
706 return 0;
707}
708
709static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
710 unsigned int node)
711{
712 int status;
713 struct dlm_cancel_join cancel_msg;
714
715 memset(&cancel_msg, 0, sizeof(cancel_msg));
716 cancel_msg.node_idx = dlm->node_num;
717 cancel_msg.name_len = strlen(dlm->name);
718 memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
719
720 status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
721 &cancel_msg, sizeof(cancel_msg), node,
722 NULL);
723 if (status < 0) {
724 mlog_errno(status);
725 goto bail;
726 }
727
728bail:
729 return status;
730}
731
732/* map_size should be in bytes. */
733static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
734 unsigned long *node_map,
735 unsigned int map_size)
736{
737 int status, tmpstat;
738 unsigned int node;
739
740 if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
741 sizeof(unsigned long))) {
742 mlog(ML_ERROR,
743 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
744 map_size, BITS_TO_LONGS(O2NM_MAX_NODES));
745 return -EINVAL;
746 }
747
748 status = 0;
749 node = -1;
750 while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
751 node + 1)) < O2NM_MAX_NODES) {
752 if (node == dlm->node_num)
753 continue;
754
755 tmpstat = dlm_send_one_join_cancel(dlm, node);
756 if (tmpstat) {
757 mlog(ML_ERROR, "Error return %d cancelling join on "
758 "node %d\n", tmpstat, node);
759 if (!status)
760 status = tmpstat;
761 }
762 }
763
764 if (status)
765 mlog_errno(status);
766 return status;
767}
768
769static int dlm_request_join(struct dlm_ctxt *dlm,
770 int node,
771 enum dlm_query_join_response *response)
772{
773 int status, retval;
774 struct dlm_query_join_request join_msg;
775
776 mlog(0, "querying node %d\n", node);
777
778 memset(&join_msg, 0, sizeof(join_msg));
779 join_msg.node_idx = dlm->node_num;
780 join_msg.name_len = strlen(dlm->name);
781 memcpy(join_msg.domain, dlm->name, join_msg.name_len);
782
783 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
784 sizeof(join_msg), node, &retval);
785 if (status < 0 && status != -ENOPROTOOPT) {
786 mlog_errno(status);
787 goto bail;
788 }
789
790 /* -ENOPROTOOPT from the net code means the other side isn't
791 listening for our message type -- that's fine, it means
792 his dlm isn't up, so we can consider him a 'yes' but not
793 joined into the domain. */
794 if (status == -ENOPROTOOPT) {
795 status = 0;
796 *response = JOIN_OK_NO_MAP;
797 } else if (retval == JOIN_DISALLOW ||
798 retval == JOIN_OK ||
799 retval == JOIN_OK_NO_MAP) {
800 *response = retval;
801 } else {
802 status = -EINVAL;
803 mlog(ML_ERROR, "invalid response %d from node %u\n", retval,
804 node);
805 }
806
807 mlog(0, "status %d, node %d response is %d\n", status, node,
808 *response);
809
810bail:
811 return status;
812}
813
814static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
815 unsigned int node)
816{
817 int status;
818 struct dlm_assert_joined assert_msg;
819
820 mlog(0, "Sending join assert to node %u\n", node);
821
822 memset(&assert_msg, 0, sizeof(assert_msg));
823 assert_msg.node_idx = dlm->node_num;
824 assert_msg.name_len = strlen(dlm->name);
825 memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
826
827 status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
828 &assert_msg, sizeof(assert_msg), node,
829 NULL);
830 if (status < 0)
831 mlog_errno(status);
832
833 return status;
834}
835
836static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
837 unsigned long *node_map)
838{
839 int status, node, live;
840
841 status = 0;
842 node = -1;
843 while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
844 node + 1)) < O2NM_MAX_NODES) {
845 if (node == dlm->node_num)
846 continue;
847
848 do {
849 /* It is very important that this message be
850 * received so we spin until either the node
851 * has died or it gets the message. */
852 status = dlm_send_one_join_assert(dlm, node);
853
854 spin_lock(&dlm->spinlock);
855 live = test_bit(node, dlm->live_nodes_map);
856 spin_unlock(&dlm->spinlock);
857
858 if (status) {
859 mlog(ML_ERROR, "Error return %d asserting "
860 "join on node %d\n", status, node);
861
862 /* give us some time between errors... */
863 if (live)
864 msleep(DLM_DOMAIN_BACKOFF_MS);
865 }
866 } while (status && live);
867 }
868}
869
870struct domain_join_ctxt {
871 unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
872 unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
873};
874
875static int dlm_should_restart_join(struct dlm_ctxt *dlm,
876 struct domain_join_ctxt *ctxt,
877 enum dlm_query_join_response response)
878{
879 int ret;
880
881 if (response == JOIN_DISALLOW) {
882 mlog(0, "Latest response of disallow -- should restart\n");
883 return 1;
884 }
885
886 spin_lock(&dlm->spinlock);
887 /* For now, we restart the process if the node maps have
888 * changed at all */
889 ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
890 sizeof(dlm->live_nodes_map));
891 spin_unlock(&dlm->spinlock);
892
893 if (ret)
894 mlog(0, "Node maps changed -- should restart\n");
895
896 return ret;
897}
898
899static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
900{
901 int status = 0, tmpstat, node;
902 struct domain_join_ctxt *ctxt;
903 enum dlm_query_join_response response;
904
905 mlog_entry("%p", dlm);
906
907 ctxt = kcalloc(1, sizeof(*ctxt), GFP_KERNEL);
908 if (!ctxt) {
909 status = -ENOMEM;
910 mlog_errno(status);
911 goto bail;
912 }
913
914 /* group sem locking should work for us here -- we're already
915 * registered for heartbeat events so filling this should be
916 * atomic wrt getting those handlers called. */
917 o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
918
919 spin_lock(&dlm->spinlock);
920 memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
921
922 __dlm_set_joining_node(dlm, dlm->node_num);
923
924 spin_unlock(&dlm->spinlock);
925
926 node = -1;
927 while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
928 node + 1)) < O2NM_MAX_NODES) {
929 if (node == dlm->node_num)
930 continue;
931
932 status = dlm_request_join(dlm, node, &response);
933 if (status < 0) {
934 mlog_errno(status);
935 goto bail;
936 }
937
938 /* Ok, either we got a response or the node doesn't have a
939 * dlm up. */
940 if (response == JOIN_OK)
941 set_bit(node, ctxt->yes_resp_map);
942
943 if (dlm_should_restart_join(dlm, ctxt, response)) {
944 status = -EAGAIN;
945 goto bail;
946 }
947 }
948
949 mlog(0, "Yay, done querying nodes!\n");
950
951 /* Yay, everyone agree's we can join the domain. My domain is
952 * comprised of all nodes who were put in the
953 * yes_resp_map. Copy that into our domain map and send a join
954 * assert message to clean up everyone elses state. */
955 spin_lock(&dlm->spinlock);
956 memcpy(dlm->domain_map, ctxt->yes_resp_map,
957 sizeof(ctxt->yes_resp_map));
958 set_bit(dlm->node_num, dlm->domain_map);
959 spin_unlock(&dlm->spinlock);
960
961 dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
962
963 /* Joined state *must* be set before the joining node
964 * information, otherwise the query_join handler may read no
965 * current joiner but a state of NEW and tell joining nodes
966 * we're not in the domain. */
967 spin_lock(&dlm_domain_lock);
968 dlm->dlm_state = DLM_CTXT_JOINED;
969 dlm->num_joins++;
970 spin_unlock(&dlm_domain_lock);
971
972bail:
973 spin_lock(&dlm->spinlock);
974 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
975 if (!status)
976 __dlm_print_nodes(dlm);
977 spin_unlock(&dlm->spinlock);
978
979 if (ctxt) {
980 /* Do we need to send a cancel message to any nodes? */
981 if (status < 0) {
982 tmpstat = dlm_send_join_cancels(dlm,
983 ctxt->yes_resp_map,
984 sizeof(ctxt->yes_resp_map));
985 if (tmpstat < 0)
986 mlog_errno(tmpstat);
987 }
988 kfree(ctxt);
989 }
990
991 mlog(0, "returning %d\n", status);
992 return status;
993}
994
995static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
996{
997 o2hb_unregister_callback(&dlm->dlm_hb_up);
998 o2hb_unregister_callback(&dlm->dlm_hb_down);
999 o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1000}
1001
1002static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1003{
1004 int status;
1005
1006 mlog(0, "registering handlers.\n");
1007
1008 o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1009 dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1010 status = o2hb_register_callback(&dlm->dlm_hb_down);
1011 if (status)
1012 goto bail;
1013
1014 o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1015 dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1016 status = o2hb_register_callback(&dlm->dlm_hb_up);
1017 if (status)
1018 goto bail;
1019
1020 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1021 sizeof(struct dlm_master_request),
1022 dlm_master_request_handler,
1023 dlm, &dlm->dlm_domain_handlers);
1024 if (status)
1025 goto bail;
1026
1027 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1028 sizeof(struct dlm_assert_master),
1029 dlm_assert_master_handler,
1030 dlm, &dlm->dlm_domain_handlers);
1031 if (status)
1032 goto bail;
1033
1034 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1035 sizeof(struct dlm_create_lock),
1036 dlm_create_lock_handler,
1037 dlm, &dlm->dlm_domain_handlers);
1038 if (status)
1039 goto bail;
1040
1041 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1042 DLM_CONVERT_LOCK_MAX_LEN,
1043 dlm_convert_lock_handler,
1044 dlm, &dlm->dlm_domain_handlers);
1045 if (status)
1046 goto bail;
1047
1048 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1049 DLM_UNLOCK_LOCK_MAX_LEN,
1050 dlm_unlock_lock_handler,
1051 dlm, &dlm->dlm_domain_handlers);
1052 if (status)
1053 goto bail;
1054
1055 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1056 DLM_PROXY_AST_MAX_LEN,
1057 dlm_proxy_ast_handler,
1058 dlm, &dlm->dlm_domain_handlers);
1059 if (status)
1060 goto bail;
1061
1062 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1063 sizeof(struct dlm_exit_domain),
1064 dlm_exit_domain_handler,
1065 dlm, &dlm->dlm_domain_handlers);
1066 if (status)
1067 goto bail;
1068
1069 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1070 sizeof(struct dlm_migrate_request),
1071 dlm_migrate_request_handler,
1072 dlm, &dlm->dlm_domain_handlers);
1073 if (status)
1074 goto bail;
1075
1076 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1077 DLM_MIG_LOCKRES_MAX_LEN,
1078 dlm_mig_lockres_handler,
1079 dlm, &dlm->dlm_domain_handlers);
1080 if (status)
1081 goto bail;
1082
1083 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1084 sizeof(struct dlm_master_requery),
1085 dlm_master_requery_handler,
1086 dlm, &dlm->dlm_domain_handlers);
1087 if (status)
1088 goto bail;
1089
1090 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1091 sizeof(struct dlm_lock_request),
1092 dlm_request_all_locks_handler,
1093 dlm, &dlm->dlm_domain_handlers);
1094 if (status)
1095 goto bail;
1096
1097 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1098 sizeof(struct dlm_reco_data_done),
1099 dlm_reco_data_done_handler,
1100 dlm, &dlm->dlm_domain_handlers);
1101 if (status)
1102 goto bail;
1103
1104 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1105 sizeof(struct dlm_begin_reco),
1106 dlm_begin_reco_handler,
1107 dlm, &dlm->dlm_domain_handlers);
1108 if (status)
1109 goto bail;
1110
1111 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1112 sizeof(struct dlm_finalize_reco),
1113 dlm_finalize_reco_handler,
1114 dlm, &dlm->dlm_domain_handlers);
1115 if (status)
1116 goto bail;
1117
1118bail:
1119 if (status)
1120 dlm_unregister_domain_handlers(dlm);
1121
1122 return status;
1123}
1124
1125static int dlm_join_domain(struct dlm_ctxt *dlm)
1126{
1127 int status;
1128
1129 BUG_ON(!dlm);
1130
1131 mlog(0, "Join domain %s\n", dlm->name);
1132
1133 status = dlm_register_domain_handlers(dlm);
1134 if (status) {
1135 mlog_errno(status);
1136 goto bail;
1137 }
1138
1139 status = dlm_launch_thread(dlm);
1140 if (status < 0) {
1141 mlog_errno(status);
1142 goto bail;
1143 }
1144
1145 status = dlm_launch_recovery_thread(dlm);
1146 if (status < 0) {
1147 mlog_errno(status);
1148 goto bail;
1149 }
1150
1151 do {
1152 unsigned int backoff;
1153 status = dlm_try_to_join_domain(dlm);
1154
1155 /* If we're racing another node to the join, then we
1156 * need to back off temporarily and let them
1157 * complete. */
1158 if (status == -EAGAIN) {
1159 if (signal_pending(current)) {
1160 status = -ERESTARTSYS;
1161 goto bail;
1162 }
1163
1164 /*
1165 * <chip> After you!
1166 * <dale> No, after you!
1167 * <chip> I insist!
1168 * <dale> But you first!
1169 * ...
1170 */
1171 backoff = (unsigned int)(jiffies & 0x3);
1172 backoff *= DLM_DOMAIN_BACKOFF_MS;
1173 mlog(0, "backoff %d\n", backoff);
1174 msleep(backoff);
1175 }
1176 } while (status == -EAGAIN);
1177
1178 if (status < 0) {
1179 mlog_errno(status);
1180 goto bail;
1181 }
1182
1183 status = 0;
1184bail:
1185 wake_up(&dlm_domain_events);
1186
1187 if (status) {
1188 dlm_unregister_domain_handlers(dlm);
1189 dlm_complete_thread(dlm);
1190 dlm_complete_recovery_thread(dlm);
1191 }
1192
1193 return status;
1194}
1195
1196static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1197 u32 key)
1198{
1199 int i;
1200 struct dlm_ctxt *dlm = NULL;
1201
1202 dlm = kcalloc(1, sizeof(*dlm), GFP_KERNEL);
1203 if (!dlm) {
1204 mlog_errno(-ENOMEM);
1205 goto leave;
1206 }
1207
1208 dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
1209 if (dlm->name == NULL) {
1210 mlog_errno(-ENOMEM);
1211 kfree(dlm);
1212 dlm = NULL;
1213 goto leave;
1214 }
1215
Daniel Phillips03d864c2006-03-10 18:08:16 -08001216 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
Mark Fasheh81f20942006-02-28 17:31:22 -08001217 if (!dlm->lockres_hash) {
Kurt Hackel6714d8e2005-12-15 14:31:23 -08001218 mlog_errno(-ENOMEM);
1219 kfree(dlm->name);
1220 kfree(dlm);
1221 dlm = NULL;
1222 goto leave;
1223 }
Kurt Hackel6714d8e2005-12-15 14:31:23 -08001224
Daniel Phillips03d864c2006-03-10 18:08:16 -08001225 for (i = 0; i < DLM_HASH_BUCKETS; i++)
1226 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
Kurt Hackel6714d8e2005-12-15 14:31:23 -08001227
1228 strcpy(dlm->name, domain);
1229 dlm->key = key;
1230 dlm->node_num = o2nm_this_node();
1231
1232 spin_lock_init(&dlm->spinlock);
1233 spin_lock_init(&dlm->master_lock);
1234 spin_lock_init(&dlm->ast_lock);
1235 INIT_LIST_HEAD(&dlm->list);
1236 INIT_LIST_HEAD(&dlm->dirty_list);
1237 INIT_LIST_HEAD(&dlm->reco.resources);
1238 INIT_LIST_HEAD(&dlm->reco.received);
1239 INIT_LIST_HEAD(&dlm->reco.node_data);
1240 INIT_LIST_HEAD(&dlm->purge_list);
1241 INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1242 dlm->reco.state = 0;
1243
1244 INIT_LIST_HEAD(&dlm->pending_asts);
1245 INIT_LIST_HEAD(&dlm->pending_basts);
1246
1247 mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1248 dlm->recovery_map, &(dlm->recovery_map[0]));
1249
1250 memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1251 memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1252 memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1253
1254 dlm->dlm_thread_task = NULL;
1255 dlm->dlm_reco_thread_task = NULL;
1256 init_waitqueue_head(&dlm->dlm_thread_wq);
1257 init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1258 init_waitqueue_head(&dlm->reco.event);
1259 init_waitqueue_head(&dlm->ast_wq);
1260 init_waitqueue_head(&dlm->migration_wq);
1261 INIT_LIST_HEAD(&dlm->master_list);
1262 INIT_LIST_HEAD(&dlm->mle_hb_events);
1263
1264 dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1265 init_waitqueue_head(&dlm->dlm_join_events);
1266
1267 dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1268 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1269 atomic_set(&dlm->local_resources, 0);
1270 atomic_set(&dlm->remote_resources, 0);
1271 atomic_set(&dlm->unknown_resources, 0);
1272
1273 spin_lock_init(&dlm->work_lock);
1274 INIT_LIST_HEAD(&dlm->work_list);
1275 INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm);
1276
1277 kref_init(&dlm->dlm_refs);
1278 dlm->dlm_state = DLM_CTXT_NEW;
1279
1280 INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1281
1282 mlog(0, "context init: refcount %u\n",
1283 atomic_read(&dlm->dlm_refs.refcount));
1284
1285leave:
1286 return dlm;
1287}
1288
1289/*
1290 * dlm_register_domain: one-time setup per "domain"
1291 */
1292struct dlm_ctxt * dlm_register_domain(const char *domain,
1293 u32 key)
1294{
1295 int ret;
1296 struct dlm_ctxt *dlm = NULL;
1297 struct dlm_ctxt *new_ctxt = NULL;
1298
1299 if (strlen(domain) > O2NM_MAX_NAME_LEN) {
1300 ret = -ENAMETOOLONG;
1301 mlog(ML_ERROR, "domain name length too long\n");
1302 goto leave;
1303 }
1304
1305 if (!o2hb_check_local_node_heartbeating()) {
1306 mlog(ML_ERROR, "the local node has not been configured, or is "
1307 "not heartbeating\n");
1308 ret = -EPROTO;
1309 goto leave;
1310 }
1311
1312 mlog(0, "register called for domain \"%s\"\n", domain);
1313
1314retry:
1315 dlm = NULL;
1316 if (signal_pending(current)) {
1317 ret = -ERESTARTSYS;
1318 mlog_errno(ret);
1319 goto leave;
1320 }
1321
1322 spin_lock(&dlm_domain_lock);
1323
1324 dlm = __dlm_lookup_domain(domain);
1325 if (dlm) {
1326 if (dlm->dlm_state != DLM_CTXT_JOINED) {
1327 spin_unlock(&dlm_domain_lock);
1328
1329 mlog(0, "This ctxt is not joined yet!\n");
1330 wait_event_interruptible(dlm_domain_events,
1331 dlm_wait_on_domain_helper(
1332 domain));
1333 goto retry;
1334 }
1335
1336 __dlm_get(dlm);
1337 dlm->num_joins++;
1338
1339 spin_unlock(&dlm_domain_lock);
1340
1341 ret = 0;
1342 goto leave;
1343 }
1344
1345 /* doesn't exist */
1346 if (!new_ctxt) {
1347 spin_unlock(&dlm_domain_lock);
1348
1349 new_ctxt = dlm_alloc_ctxt(domain, key);
1350 if (new_ctxt)
1351 goto retry;
1352
1353 ret = -ENOMEM;
1354 mlog_errno(ret);
1355 goto leave;
1356 }
1357
1358 /* a little variable switch-a-roo here... */
1359 dlm = new_ctxt;
1360 new_ctxt = NULL;
1361
1362 /* add the new domain */
1363 list_add_tail(&dlm->list, &dlm_domains);
1364 spin_unlock(&dlm_domain_lock);
1365
1366 ret = dlm_join_domain(dlm);
1367 if (ret) {
1368 mlog_errno(ret);
1369 dlm_put(dlm);
1370 goto leave;
1371 }
1372
1373 ret = 0;
1374leave:
1375 if (new_ctxt)
1376 dlm_free_ctxt_mem(new_ctxt);
1377
1378 if (ret < 0)
1379 dlm = ERR_PTR(ret);
1380
1381 return dlm;
1382}
1383EXPORT_SYMBOL_GPL(dlm_register_domain);
1384
1385static LIST_HEAD(dlm_join_handlers);
1386
1387static void dlm_unregister_net_handlers(void)
1388{
1389 o2net_unregister_handler_list(&dlm_join_handlers);
1390}
1391
1392static int dlm_register_net_handlers(void)
1393{
1394 int status = 0;
1395
1396 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1397 sizeof(struct dlm_query_join_request),
1398 dlm_query_join_handler,
1399 NULL, &dlm_join_handlers);
1400 if (status)
1401 goto bail;
1402
1403 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1404 sizeof(struct dlm_assert_joined),
1405 dlm_assert_joined_handler,
1406 NULL, &dlm_join_handlers);
1407 if (status)
1408 goto bail;
1409
1410 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1411 sizeof(struct dlm_cancel_join),
1412 dlm_cancel_join_handler,
1413 NULL, &dlm_join_handlers);
1414
1415bail:
1416 if (status < 0)
1417 dlm_unregister_net_handlers();
1418
1419 return status;
1420}
1421
1422/* Domain eviction callback handling.
1423 *
1424 * The file system requires notification of node death *before* the
1425 * dlm completes it's recovery work, otherwise it may be able to
1426 * acquire locks on resources requiring recovery. Since the dlm can
1427 * evict a node from it's domain *before* heartbeat fires, a similar
1428 * mechanism is required. */
1429
1430/* Eviction is not expected to happen often, so a per-domain lock is
1431 * not necessary. Eviction callbacks are allowed to sleep for short
1432 * periods of time. */
1433static DECLARE_RWSEM(dlm_callback_sem);
1434
1435void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1436 int node_num)
1437{
1438 struct list_head *iter;
1439 struct dlm_eviction_cb *cb;
1440
1441 down_read(&dlm_callback_sem);
1442 list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1443 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1444
1445 cb->ec_func(node_num, cb->ec_data);
1446 }
1447 up_read(&dlm_callback_sem);
1448}
1449
1450void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1451 dlm_eviction_func *f,
1452 void *data)
1453{
1454 INIT_LIST_HEAD(&cb->ec_item);
1455 cb->ec_func = f;
1456 cb->ec_data = data;
1457}
1458EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1459
1460void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1461 struct dlm_eviction_cb *cb)
1462{
1463 down_write(&dlm_callback_sem);
1464 list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1465 up_write(&dlm_callback_sem);
1466}
1467EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1468
1469void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1470{
1471 down_write(&dlm_callback_sem);
1472 list_del_init(&cb->ec_item);
1473 up_write(&dlm_callback_sem);
1474}
1475EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1476
1477static int __init dlm_init(void)
1478{
1479 int status;
1480
1481 dlm_print_version();
1482
1483 status = dlm_init_mle_cache();
1484 if (status)
1485 return -1;
1486
1487 status = dlm_register_net_handlers();
1488 if (status) {
1489 dlm_destroy_mle_cache();
1490 return -1;
1491 }
1492
1493 return 0;
1494}
1495
1496static void __exit dlm_exit (void)
1497{
1498 dlm_unregister_net_handlers();
1499 dlm_destroy_mle_cache();
1500}
1501
1502MODULE_AUTHOR("Oracle");
1503MODULE_LICENSE("GPL");
1504
1505module_init(dlm_init);
1506module_exit(dlm_exit);