blob: 021978e0576b641c8777a2ce2d8fc62f86e9f187 [file] [log] [blame]
Mark Fashehccd979b2005-12-15 14:31:24 -08001/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * vote.c
5 *
6 * description here
7 *
8 * Copyright (C) 2003, 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/smp_lock.h>
30#include <linux/kthread.h>
31
32#include <cluster/heartbeat.h>
33#include <cluster/nodemanager.h>
34#include <cluster/tcp.h>
35
36#include <dlm/dlmapi.h>
37
38#define MLOG_MASK_PREFIX ML_VOTE
39#include <cluster/masklog.h>
40
41#include "ocfs2.h"
42
43#include "alloc.h"
44#include "dlmglue.h"
45#include "extent_map.h"
46#include "heartbeat.h"
47#include "inode.h"
48#include "journal.h"
49#include "slot_map.h"
50#include "vote.h"
51
52#include "buffer_head_io.h"
53
54#define OCFS2_MESSAGE_TYPE_VOTE (0x1)
55#define OCFS2_MESSAGE_TYPE_RESPONSE (0x2)
56struct ocfs2_msg_hdr
57{
58 __be32 h_response_id; /* used to lookup message handle on sending
59 * node. */
60 __be32 h_request;
61 __be64 h_blkno;
62 __be32 h_generation;
63 __be32 h_node_num; /* node sending this particular message. */
64};
65
66/* OCFS2_MAX_FILENAME_LEN is 255 characters, but we want to align this
67 * for the network. */
68#define OCFS2_VOTE_FILENAME_LEN 256
69struct ocfs2_vote_msg
70{
71 struct ocfs2_msg_hdr v_hdr;
72 union {
73 __be32 v_generic1;
74 __be32 v_orphaned_slot; /* Used during delete votes */
75 __be32 v_nlink; /* Used during unlink votes */
76 } md1; /* Message type dependant 1 */
77 __be32 v_unlink_namelen;
78 __be64 v_unlink_parent;
79 u8 v_unlink_dirent[OCFS2_VOTE_FILENAME_LEN];
80};
81
82/* Responses are given these values to maintain backwards
83 * compatibility with older ocfs2 versions */
84#define OCFS2_RESPONSE_OK (0)
85#define OCFS2_RESPONSE_BUSY (-16)
86#define OCFS2_RESPONSE_BAD_MSG (-22)
87
88struct ocfs2_response_msg
89{
90 struct ocfs2_msg_hdr r_hdr;
91 __be32 r_response;
92 __be32 r_orphaned_slot;
93};
94
95struct ocfs2_vote_work {
96 struct list_head w_list;
97 struct ocfs2_vote_msg w_msg;
98};
99
100enum ocfs2_vote_request {
101 OCFS2_VOTE_REQ_INVALID = 0,
102 OCFS2_VOTE_REQ_DELETE,
103 OCFS2_VOTE_REQ_UNLINK,
104 OCFS2_VOTE_REQ_RENAME,
105 OCFS2_VOTE_REQ_MOUNT,
106 OCFS2_VOTE_REQ_UMOUNT,
107 OCFS2_VOTE_REQ_LAST
108};
109
110static inline int ocfs2_is_valid_vote_request(int request)
111{
112 return OCFS2_VOTE_REQ_INVALID < request &&
113 request < OCFS2_VOTE_REQ_LAST;
114}
115
116typedef void (*ocfs2_net_response_callback)(void *priv,
117 struct ocfs2_response_msg *resp);
118struct ocfs2_net_response_cb {
119 ocfs2_net_response_callback rc_cb;
120 void *rc_priv;
121};
122
123struct ocfs2_net_wait_ctxt {
124 struct list_head n_list;
125 u32 n_response_id;
126 wait_queue_head_t n_event;
127 struct ocfs2_node_map n_node_map;
128 int n_response; /* an agreggate response. 0 if
129 * all nodes are go, < 0 on any
130 * negative response from any
131 * node or network error. */
132 struct ocfs2_net_response_cb *n_callback;
133};
134
135static void ocfs2_process_mount_request(struct ocfs2_super *osb,
136 unsigned int node_num)
137{
138 mlog(0, "MOUNT vote from node %u\n", node_num);
139 /* The other node only sends us this message when he has an EX
140 * on the superblock, so our recovery threads (if having been
141 * launched) are waiting on it.*/
142 ocfs2_recovery_map_clear(osb, node_num);
143 ocfs2_node_map_set_bit(osb, &osb->mounted_map, node_num);
144
145 /* We clear the umount map here because a node may have been
146 * previously mounted, safely unmounted but never stopped
147 * heartbeating - in which case we'd have a stale entry. */
148 ocfs2_node_map_clear_bit(osb, &osb->umount_map, node_num);
149}
150
151static void ocfs2_process_umount_request(struct ocfs2_super *osb,
152 unsigned int node_num)
153{
154 mlog(0, "UMOUNT vote from node %u\n", node_num);
155 ocfs2_node_map_clear_bit(osb, &osb->mounted_map, node_num);
156 ocfs2_node_map_set_bit(osb, &osb->umount_map, node_num);
157}
158
159void ocfs2_mark_inode_remotely_deleted(struct inode *inode)
160{
161 struct ocfs2_inode_info *oi = OCFS2_I(inode);
162
163 assert_spin_locked(&oi->ip_lock);
164 /* We set the SKIP_DELETE flag on the inode so we don't try to
165 * delete it in delete_inode ourselves, thus avoiding
166 * unecessary lock pinging. If the other node failed to wipe
167 * the inode as a result of a crash, then recovery will pick
168 * up the slack. */
169 oi->ip_flags |= OCFS2_INODE_DELETED|OCFS2_INODE_SKIP_DELETE;
170}
171
172static int ocfs2_process_delete_request(struct inode *inode,
173 int *orphaned_slot)
174{
175 int response = OCFS2_RESPONSE_BUSY;
176
177 mlog(0, "DELETE vote on inode %lu, read lnk_cnt = %u, slot = %d\n",
178 inode->i_ino, inode->i_nlink, *orphaned_slot);
179
180 spin_lock(&OCFS2_I(inode)->ip_lock);
181
182 /* Whatever our vote response is, we want to make sure that
183 * the orphaned slot is recorded properly on this node *and*
184 * on the requesting node. Technically, if the requesting node
185 * did not know which slot the inode is orphaned in but we
186 * respond with BUSY he doesn't actually need the orphaned
187 * slot, but it doesn't hurt to do it here anyway. */
188 if ((*orphaned_slot) != OCFS2_INVALID_SLOT) {
189 mlog_bug_on_msg(OCFS2_I(inode)->ip_orphaned_slot !=
190 OCFS2_INVALID_SLOT &&
191 OCFS2_I(inode)->ip_orphaned_slot !=
192 (*orphaned_slot),
193 "Inode %"MLFu64": This node thinks it's "
194 "orphaned in slot %d, messaged it's in %d\n",
195 OCFS2_I(inode)->ip_blkno,
196 OCFS2_I(inode)->ip_orphaned_slot,
197 *orphaned_slot);
198
199 mlog(0, "Setting orphaned slot for inode %"MLFu64" to %d\n",
200 OCFS2_I(inode)->ip_blkno, *orphaned_slot);
201
202 OCFS2_I(inode)->ip_orphaned_slot = *orphaned_slot;
203 } else {
204 mlog(0, "Sending back orphaned slot %d for inode %"MLFu64"\n",
205 OCFS2_I(inode)->ip_orphaned_slot,
206 OCFS2_I(inode)->ip_blkno);
207
208 *orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot;
209 }
210
211 /* vote no if the file is still open. */
212 if (OCFS2_I(inode)->ip_open_count) {
213 mlog(0, "open count = %u\n",
214 OCFS2_I(inode)->ip_open_count);
215 spin_unlock(&OCFS2_I(inode)->ip_lock);
216 goto done;
217 }
218 spin_unlock(&OCFS2_I(inode)->ip_lock);
219
220 /* directories are a bit ugly... What if someone is sitting in
221 * it? We want to make sure the inode is removed completely as
222 * a result of the iput in process_vote. */
223 if (S_ISDIR(inode->i_mode) && (atomic_read(&inode->i_count) != 1)) {
224 mlog(0, "i_count = %u\n", atomic_read(&inode->i_count));
225 goto done;
226 }
227
228 if (filemap_fdatawrite(inode->i_mapping)) {
229 mlog(ML_ERROR, "Could not sync inode %"MLFu64" for delete!\n",
230 OCFS2_I(inode)->ip_blkno);
231 goto done;
232 }
233 sync_mapping_buffers(inode->i_mapping);
234 truncate_inode_pages(inode->i_mapping, 0);
235 ocfs2_extent_map_trunc(inode, 0);
236
237 spin_lock(&OCFS2_I(inode)->ip_lock);
238 /* double check open count - someone might have raced this
239 * thread into ocfs2_file_open while we were writing out
240 * data. If we're to allow a wipe of this inode now, we *must*
241 * hold the spinlock until we've marked it. */
242 if (OCFS2_I(inode)->ip_open_count) {
243 mlog(0, "Raced to wipe! open count = %u\n",
244 OCFS2_I(inode)->ip_open_count);
245 spin_unlock(&OCFS2_I(inode)->ip_lock);
246 goto done;
247 }
248
249 /* Mark the inode as being wiped from disk. */
250 ocfs2_mark_inode_remotely_deleted(inode);
251 spin_unlock(&OCFS2_I(inode)->ip_lock);
252
253 /* Not sure this is necessary anymore. */
254 d_prune_aliases(inode);
255
256 /* If we get here, then we're voting 'yes', so commit the
257 * delete on our side. */
258 response = OCFS2_RESPONSE_OK;
259done:
260 return response;
261}
262
263static int ocfs2_match_dentry(struct dentry *dentry,
264 u64 parent_blkno,
265 unsigned int namelen,
266 const char *name)
267{
268 struct inode *parent;
269
270 if (!dentry->d_parent) {
271 mlog(0, "Detached from parent.\n");
272 return 0;
273 }
274
275 parent = dentry->d_parent->d_inode;
276 /* Negative parent dentry? */
277 if (!parent)
278 return 0;
279
280 /* Name is in a different directory. */
281 if (OCFS2_I(parent)->ip_blkno != parent_blkno)
282 return 0;
283
284 if (dentry->d_name.len != namelen)
285 return 0;
286
287 /* comparison above guarantees this is safe. */
288 if (memcmp(dentry->d_name.name, name, namelen))
289 return 0;
290
291 return 1;
292}
293
294static void ocfs2_process_dentry_request(struct inode *inode,
295 int rename,
296 unsigned int new_nlink,
297 u64 parent_blkno,
298 unsigned int namelen,
299 const char *name)
300{
301 struct dentry *dentry = NULL;
302 struct list_head *p;
303 struct ocfs2_inode_info *oi = OCFS2_I(inode);
304
305 mlog(0, "parent %"MLFu64", namelen = %u, name = %.*s\n", parent_blkno,
306 namelen, namelen, name);
307
308 spin_lock(&dcache_lock);
309
310 /* Another node is removing this name from the system. It is
311 * up to us to find the corresponding dentry and if it exists,
312 * unhash it from the dcache. */
313 list_for_each(p, &inode->i_dentry) {
314 dentry = list_entry(p, struct dentry, d_alias);
315
316 if (ocfs2_match_dentry(dentry, parent_blkno, namelen, name)) {
317 mlog(0, "dentry found: %.*s\n",
318 dentry->d_name.len, dentry->d_name.name);
319
320 dget_locked(dentry);
321 break;
322 }
323
324 dentry = NULL;
325 }
326
327 spin_unlock(&dcache_lock);
328
329 if (dentry) {
330 d_delete(dentry);
331 dput(dentry);
332 }
333
334 /* rename votes don't send link counts */
335 if (!rename) {
336 mlog(0, "new_nlink = %u\n", new_nlink);
337
338 /* We don't have the proper locks here to directly
339 * change i_nlink and besides, the vote is sent
340 * *before* the operation so it may have failed on the
341 * other node. This passes a hint to ocfs2_drop_inode
342 * to force ocfs2_delete_inode, who will take the
343 * proper cluster locks to sort things out. */
344 if (new_nlink == 0) {
345 spin_lock(&oi->ip_lock);
346 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
347 spin_unlock(&OCFS2_I(inode)->ip_lock);
348 }
349 }
350}
351
352static void ocfs2_process_vote(struct ocfs2_super *osb,
353 struct ocfs2_vote_msg *msg)
354{
355 int net_status, vote_response;
356 int orphaned_slot = 0;
357 int rename = 0;
358 unsigned int node_num, generation, new_nlink, namelen;
359 u64 blkno, parent_blkno;
360 enum ocfs2_vote_request request;
361 struct inode *inode = NULL;
362 struct ocfs2_msg_hdr *hdr = &msg->v_hdr;
363 struct ocfs2_response_msg response;
364
365 /* decode the network mumbo jumbo into local variables. */
366 request = be32_to_cpu(hdr->h_request);
367 blkno = be64_to_cpu(hdr->h_blkno);
368 generation = be32_to_cpu(hdr->h_generation);
369 node_num = be32_to_cpu(hdr->h_node_num);
370 if (request == OCFS2_VOTE_REQ_DELETE)
371 orphaned_slot = be32_to_cpu(msg->md1.v_orphaned_slot);
372
373 mlog(0, "processing vote: request = %u, blkno = %"MLFu64", "
374 "generation = %u, node_num = %u, priv1 = %u\n", request,
375 blkno, generation, node_num, be32_to_cpu(msg->md1.v_generic1));
376
377 if (!ocfs2_is_valid_vote_request(request)) {
378 mlog(ML_ERROR, "Invalid vote request %d from node %u\n",
379 request, node_num);
380 vote_response = OCFS2_RESPONSE_BAD_MSG;
381 goto respond;
382 }
383
384 vote_response = OCFS2_RESPONSE_OK;
385
386 switch (request) {
387 case OCFS2_VOTE_REQ_UMOUNT:
388 ocfs2_process_umount_request(osb, node_num);
389 goto respond;
390 case OCFS2_VOTE_REQ_MOUNT:
391 ocfs2_process_mount_request(osb, node_num);
392 goto respond;
393 default:
394 /* avoids a gcc warning */
395 break;
396 }
397
398 /* We cannot process the remaining message types before we're
399 * fully mounted. It's perfectly safe however to send a 'yes'
400 * response as we can't possibly have any of the state they're
401 * asking us to modify yet. */
402 if (atomic_read(&osb->vol_state) == VOLUME_INIT)
403 goto respond;
404
405 /* If we get here, then the request is against an inode. */
406 inode = ocfs2_ilookup_for_vote(osb, blkno,
407 request == OCFS2_VOTE_REQ_DELETE);
408
409 /* Not finding the inode is perfectly valid - it means we're
410 * not interested in what the other node is about to do to it
411 * so in those cases we automatically respond with an
412 * affirmative. Cluster locking ensures that we won't race
413 * interest in the inode with this vote request. */
414 if (!inode)
415 goto respond;
416
417 /* Check generation values. It's possible for us to get a
418 * request against a stale inode. If so then we proceed as if
419 * we had not found an inode in the first place. */
420 if (inode->i_generation != generation) {
421 mlog(0, "generation passed %u != inode generation = %u, "
422 "ip_flags = %x, ip_blkno = %"MLFu64", msg %"MLFu64", "
423 "i_count = %u, message type = %u\n",
424 generation, inode->i_generation, OCFS2_I(inode)->ip_flags,
425 OCFS2_I(inode)->ip_blkno, blkno,
426 atomic_read(&inode->i_count), request);
427 iput(inode);
428 inode = NULL;
429 goto respond;
430 }
431
432 switch (request) {
433 case OCFS2_VOTE_REQ_DELETE:
434 vote_response = ocfs2_process_delete_request(inode,
435 &orphaned_slot);
436 break;
437 case OCFS2_VOTE_REQ_RENAME:
438 rename = 1;
439 /* fall through */
440 case OCFS2_VOTE_REQ_UNLINK:
441 parent_blkno = be64_to_cpu(msg->v_unlink_parent);
442 namelen = be32_to_cpu(msg->v_unlink_namelen);
443 /* new_nlink will be ignored in case of a rename vote */
444 new_nlink = be32_to_cpu(msg->md1.v_nlink);
445 ocfs2_process_dentry_request(inode, rename, new_nlink,
446 parent_blkno, namelen,
447 msg->v_unlink_dirent);
448 break;
449 default:
450 mlog(ML_ERROR, "node %u, invalid request: %u\n",
451 node_num, request);
452 vote_response = OCFS2_RESPONSE_BAD_MSG;
453 }
454
455respond:
456 /* Response struture is small so we just put it on the stack
457 * and stuff it inline. */
458 memset(&response, 0, sizeof(struct ocfs2_response_msg));
459 response.r_hdr.h_response_id = hdr->h_response_id;
460 response.r_hdr.h_blkno = hdr->h_blkno;
461 response.r_hdr.h_generation = hdr->h_generation;
462 response.r_hdr.h_node_num = cpu_to_be32(osb->node_num);
463 response.r_response = cpu_to_be32(vote_response);
464 response.r_orphaned_slot = cpu_to_be32(orphaned_slot);
465
466 net_status = o2net_send_message(OCFS2_MESSAGE_TYPE_RESPONSE,
467 osb->net_key,
468 &response,
469 sizeof(struct ocfs2_response_msg),
470 node_num,
471 NULL);
472 /* We still want to error print for ENOPROTOOPT here. The
473 * sending node shouldn't have unregistered his net handler
474 * without sending an unmount vote 1st */
475 if (net_status < 0
476 && net_status != -ETIMEDOUT
477 && net_status != -ENOTCONN)
478 mlog(ML_ERROR, "message to node %u fails with error %d!\n",
479 node_num, net_status);
480
481 if (inode)
482 iput(inode);
483}
484
485static void ocfs2_vote_thread_do_work(struct ocfs2_super *osb)
486{
487 unsigned long processed;
488 struct ocfs2_lock_res *lockres;
489 struct ocfs2_vote_work *work;
490
491 mlog_entry_void();
492
493 spin_lock(&osb->vote_task_lock);
494 /* grab this early so we know to try again if a state change and
495 * wake happens part-way through our work */
496 osb->vote_work_sequence = osb->vote_wake_sequence;
497
498 processed = osb->blocked_lock_count;
499 while (processed) {
500 BUG_ON(list_empty(&osb->blocked_lock_list));
501
502 lockres = list_entry(osb->blocked_lock_list.next,
503 struct ocfs2_lock_res, l_blocked_list);
504 list_del_init(&lockres->l_blocked_list);
505 osb->blocked_lock_count--;
506 spin_unlock(&osb->vote_task_lock);
507
508 BUG_ON(!processed);
509 processed--;
510
511 ocfs2_process_blocked_lock(osb, lockres);
512
513 spin_lock(&osb->vote_task_lock);
514 }
515
516 while (osb->vote_count) {
517 BUG_ON(list_empty(&osb->vote_list));
518 work = list_entry(osb->vote_list.next,
519 struct ocfs2_vote_work, w_list);
520 list_del(&work->w_list);
521 osb->vote_count--;
522 spin_unlock(&osb->vote_task_lock);
523
524 ocfs2_process_vote(osb, &work->w_msg);
525 kfree(work);
526
527 spin_lock(&osb->vote_task_lock);
528 }
529 spin_unlock(&osb->vote_task_lock);
530
531 mlog_exit_void();
532}
533
534static int ocfs2_vote_thread_lists_empty(struct ocfs2_super *osb)
535{
536 int empty = 0;
537
538 spin_lock(&osb->vote_task_lock);
539 if (list_empty(&osb->blocked_lock_list) &&
540 list_empty(&osb->vote_list))
541 empty = 1;
542
543 spin_unlock(&osb->vote_task_lock);
544 return empty;
545}
546
547static int ocfs2_vote_thread_should_wake(struct ocfs2_super *osb)
548{
549 int should_wake = 0;
550
551 spin_lock(&osb->vote_task_lock);
552 if (osb->vote_work_sequence != osb->vote_wake_sequence)
553 should_wake = 1;
554 spin_unlock(&osb->vote_task_lock);
555
556 return should_wake;
557}
558
559int ocfs2_vote_thread(void *arg)
560{
561 int status = 0;
562 struct ocfs2_super *osb = arg;
563
564 /* only quit once we've been asked to stop and there is no more
565 * work available */
566 while (!(kthread_should_stop() &&
567 ocfs2_vote_thread_lists_empty(osb))) {
568
569 wait_event_interruptible(osb->vote_event,
570 ocfs2_vote_thread_should_wake(osb) ||
571 kthread_should_stop());
572
573 mlog(0, "vote_thread: awoken\n");
574
575 ocfs2_vote_thread_do_work(osb);
576 }
577
578 osb->vote_task = NULL;
579 return status;
580}
581
582static struct ocfs2_net_wait_ctxt *ocfs2_new_net_wait_ctxt(unsigned int response_id)
583{
584 struct ocfs2_net_wait_ctxt *w;
585
586 w = kcalloc(1, sizeof(*w), GFP_KERNEL);
587 if (!w) {
588 mlog_errno(-ENOMEM);
589 goto bail;
590 }
591
592 INIT_LIST_HEAD(&w->n_list);
593 init_waitqueue_head(&w->n_event);
594 ocfs2_node_map_init(&w->n_node_map);
595 w->n_response_id = response_id;
596 w->n_callback = NULL;
597bail:
598 return w;
599}
600
601static unsigned int ocfs2_new_response_id(struct ocfs2_super *osb)
602{
603 unsigned int ret;
604
605 spin_lock(&osb->net_response_lock);
606 ret = ++osb->net_response_ids;
607 spin_unlock(&osb->net_response_lock);
608
609 return ret;
610}
611
612static void ocfs2_dequeue_net_wait_ctxt(struct ocfs2_super *osb,
613 struct ocfs2_net_wait_ctxt *w)
614{
615 spin_lock(&osb->net_response_lock);
616 list_del(&w->n_list);
617 spin_unlock(&osb->net_response_lock);
618}
619
620static void ocfs2_queue_net_wait_ctxt(struct ocfs2_super *osb,
621 struct ocfs2_net_wait_ctxt *w)
622{
623 spin_lock(&osb->net_response_lock);
624 list_add_tail(&w->n_list,
625 &osb->net_response_list);
626 spin_unlock(&osb->net_response_lock);
627}
628
629static void __ocfs2_mark_node_responded(struct ocfs2_super *osb,
630 struct ocfs2_net_wait_ctxt *w,
631 int node_num)
632{
633 assert_spin_locked(&osb->net_response_lock);
634
635 ocfs2_node_map_clear_bit(osb, &w->n_node_map, node_num);
636 if (ocfs2_node_map_is_empty(osb, &w->n_node_map))
637 wake_up(&w->n_event);
638}
639
640/* Intended to be called from the node down callback, we fake remove
641 * the node from all our response contexts */
642void ocfs2_remove_node_from_vote_queues(struct ocfs2_super *osb,
643 int node_num)
644{
645 struct list_head *p;
646 struct ocfs2_net_wait_ctxt *w = NULL;
647
648 spin_lock(&osb->net_response_lock);
649
650 list_for_each(p, &osb->net_response_list) {
651 w = list_entry(p, struct ocfs2_net_wait_ctxt, n_list);
652
653 __ocfs2_mark_node_responded(osb, w, node_num);
654 }
655
656 spin_unlock(&osb->net_response_lock);
657}
658
659static int ocfs2_broadcast_vote(struct ocfs2_super *osb,
660 struct ocfs2_vote_msg *request,
661 unsigned int response_id,
662 int *response,
663 struct ocfs2_net_response_cb *callback)
664{
665 int status, i, remote_err;
666 struct ocfs2_net_wait_ctxt *w = NULL;
667 int dequeued = 0;
668
669 mlog_entry_void();
670
671 w = ocfs2_new_net_wait_ctxt(response_id);
672 if (!w) {
673 status = -ENOMEM;
674 mlog_errno(status);
675 goto bail;
676 }
677 w->n_callback = callback;
678
679 /* we're pretty much ready to go at this point, and this fills
680 * in n_response which we need anyway... */
681 ocfs2_queue_net_wait_ctxt(osb, w);
682
683 i = ocfs2_node_map_iterate(osb, &osb->mounted_map, 0);
684
685 while (i != O2NM_INVALID_NODE_NUM) {
686 if (i != osb->node_num) {
687 mlog(0, "trying to send request to node %i\n", i);
688 ocfs2_node_map_set_bit(osb, &w->n_node_map, i);
689
690 remote_err = 0;
691 status = o2net_send_message(OCFS2_MESSAGE_TYPE_VOTE,
692 osb->net_key,
693 request,
694 sizeof(*request),
695 i,
696 &remote_err);
697 if (status == -ETIMEDOUT) {
698 mlog(0, "remote node %d timed out!\n", i);
699 status = -EAGAIN;
700 goto bail;
701 }
702 if (remote_err < 0) {
703 status = remote_err;
704 mlog(0, "remote error %d on node %d!\n",
705 remote_err, i);
706 mlog_errno(status);
707 goto bail;
708 }
709 if (status < 0) {
710 mlog_errno(status);
711 goto bail;
712 }
713 }
714 i++;
715 i = ocfs2_node_map_iterate(osb, &osb->mounted_map, i);
716 mlog(0, "next is %d, i am %d\n", i, osb->node_num);
717 }
718 mlog(0, "done sending, now waiting on responses...\n");
719
720 wait_event(w->n_event, ocfs2_node_map_is_empty(osb, &w->n_node_map));
721
722 ocfs2_dequeue_net_wait_ctxt(osb, w);
723 dequeued = 1;
724
725 *response = w->n_response;
726 status = 0;
727bail:
728 if (w) {
729 if (!dequeued)
730 ocfs2_dequeue_net_wait_ctxt(osb, w);
731 kfree(w);
732 }
733
734 mlog_exit(status);
735 return status;
736}
737
738static struct ocfs2_vote_msg * ocfs2_new_vote_request(struct ocfs2_super *osb,
739 u64 blkno,
740 unsigned int generation,
741 enum ocfs2_vote_request type,
742 u32 priv)
743{
744 struct ocfs2_vote_msg *request;
745 struct ocfs2_msg_hdr *hdr;
746
747 BUG_ON(!ocfs2_is_valid_vote_request(type));
748
749 request = kcalloc(1, sizeof(*request), GFP_KERNEL);
750 if (!request) {
751 mlog_errno(-ENOMEM);
752 } else {
753 hdr = &request->v_hdr;
754 hdr->h_node_num = cpu_to_be32(osb->node_num);
755 hdr->h_request = cpu_to_be32(type);
756 hdr->h_blkno = cpu_to_be64(blkno);
757 hdr->h_generation = cpu_to_be32(generation);
758
759 request->md1.v_generic1 = cpu_to_be32(priv);
760 }
761
762 return request;
763}
764
765/* Complete the buildup of a new vote request and process the
766 * broadcast return value. */
767static int ocfs2_do_request_vote(struct ocfs2_super *osb,
768 struct ocfs2_vote_msg *request,
769 struct ocfs2_net_response_cb *callback)
770{
771 int status, response;
772 unsigned int response_id;
773 struct ocfs2_msg_hdr *hdr;
774
775 response_id = ocfs2_new_response_id(osb);
776
777 hdr = &request->v_hdr;
778 hdr->h_response_id = cpu_to_be32(response_id);
779
780 status = ocfs2_broadcast_vote(osb, request, response_id, &response,
781 callback);
782 if (status < 0) {
783 mlog_errno(status);
784 goto bail;
785 }
786
787 status = response;
788bail:
789
790 return status;
791}
792
793static int ocfs2_request_vote(struct inode *inode,
794 struct ocfs2_vote_msg *request,
795 struct ocfs2_net_response_cb *callback)
796{
797 int status;
798 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
799
800 if (ocfs2_inode_is_new(inode))
801 return 0;
802
803 status = -EAGAIN;
804 while (status == -EAGAIN) {
805 if (!(osb->s_mount_opt & OCFS2_MOUNT_NOINTR) &&
806 signal_pending(current))
807 return -ERESTARTSYS;
808
809 status = ocfs2_super_lock(osb, 0);
810 if (status < 0) {
811 mlog_errno(status);
812 break;
813 }
814
815 status = 0;
816 if (!ocfs2_node_map_is_only(osb, &osb->mounted_map,
817 osb->node_num))
818 status = ocfs2_do_request_vote(osb, request, callback);
819
820 ocfs2_super_unlock(osb, 0);
821 }
822 return status;
823}
824
825static void ocfs2_delete_response_cb(void *priv,
826 struct ocfs2_response_msg *resp)
827{
828 int orphaned_slot, node;
829 struct inode *inode = priv;
830
831 orphaned_slot = be32_to_cpu(resp->r_orphaned_slot);
832 node = be32_to_cpu(resp->r_hdr.h_node_num);
833 mlog(0, "node %d tells us that inode %"MLFu64" is orphaned in slot "
834 "%d\n", node, OCFS2_I(inode)->ip_blkno, orphaned_slot);
835
836 /* The other node may not actually know which slot the inode
837 * is orphaned in. */
838 if (orphaned_slot == OCFS2_INVALID_SLOT)
839 return;
840
841 /* Ok, the responding node knows which slot this inode is
842 * orphaned in. We verify that the information is correct and
843 * then record this in the inode. ocfs2_delete_inode will use
844 * this information to determine which lock to take. */
845 spin_lock(&OCFS2_I(inode)->ip_lock);
846 mlog_bug_on_msg(OCFS2_I(inode)->ip_orphaned_slot != orphaned_slot &&
847 OCFS2_I(inode)->ip_orphaned_slot
848 != OCFS2_INVALID_SLOT, "Inode %"MLFu64": Node %d "
849 "says it's orphaned in slot %d, we think it's in %d\n",
850 OCFS2_I(inode)->ip_blkno,
851 be32_to_cpu(resp->r_hdr.h_node_num),
852 orphaned_slot, OCFS2_I(inode)->ip_orphaned_slot);
853
854 OCFS2_I(inode)->ip_orphaned_slot = orphaned_slot;
855 spin_unlock(&OCFS2_I(inode)->ip_lock);
856}
857
858int ocfs2_request_delete_vote(struct inode *inode)
859{
860 int orphaned_slot, status;
861 struct ocfs2_net_response_cb delete_cb;
862 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
863 struct ocfs2_vote_msg *request;
864
865 spin_lock(&OCFS2_I(inode)->ip_lock);
866 orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot;
867 spin_unlock(&OCFS2_I(inode)->ip_lock);
868
869 delete_cb.rc_cb = ocfs2_delete_response_cb;
870 delete_cb.rc_priv = inode;
871
872 mlog(0, "Inode %"MLFu64", we start thinking orphaned slot is %d\n",
873 OCFS2_I(inode)->ip_blkno, orphaned_slot);
874
875 status = -ENOMEM;
876 request = ocfs2_new_vote_request(osb, OCFS2_I(inode)->ip_blkno,
877 inode->i_generation,
878 OCFS2_VOTE_REQ_DELETE, orphaned_slot);
879 if (request) {
880 status = ocfs2_request_vote(inode, request, &delete_cb);
881
882 kfree(request);
883 }
884
885 return status;
886}
887
888static void ocfs2_setup_unlink_vote(struct ocfs2_vote_msg *request,
889 struct dentry *dentry)
890{
891 struct inode *parent = dentry->d_parent->d_inode;
892
893 /* We need some values which will uniquely identify a dentry
894 * on the other nodes so that they can find it and run
895 * d_delete against it. Parent directory block and full name
896 * should suffice. */
897
898 mlog(0, "unlink/rename request: parent: %"MLFu64" name: %.*s\n",
899 OCFS2_I(parent)->ip_blkno, dentry->d_name.len,
900 dentry->d_name.name);
901
902 request->v_unlink_parent = cpu_to_be64(OCFS2_I(parent)->ip_blkno);
903 request->v_unlink_namelen = cpu_to_be32(dentry->d_name.len);
904 memcpy(request->v_unlink_dirent, dentry->d_name.name,
905 dentry->d_name.len);
906}
907
908int ocfs2_request_unlink_vote(struct inode *inode,
909 struct dentry *dentry,
910 unsigned int nlink)
911{
912 int status;
913 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
914 struct ocfs2_vote_msg *request;
915
916 if (dentry->d_name.len > OCFS2_VOTE_FILENAME_LEN)
917 return -ENAMETOOLONG;
918
919 status = -ENOMEM;
920 request = ocfs2_new_vote_request(osb, OCFS2_I(inode)->ip_blkno,
921 inode->i_generation,
922 OCFS2_VOTE_REQ_UNLINK, nlink);
923 if (request) {
924 ocfs2_setup_unlink_vote(request, dentry);
925
926 status = ocfs2_request_vote(inode, request, NULL);
927
928 kfree(request);
929 }
930 return status;
931}
932
933int ocfs2_request_rename_vote(struct inode *inode,
934 struct dentry *dentry)
935{
936 int status;
937 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
938 struct ocfs2_vote_msg *request;
939
940 if (dentry->d_name.len > OCFS2_VOTE_FILENAME_LEN)
941 return -ENAMETOOLONG;
942
943 status = -ENOMEM;
944 request = ocfs2_new_vote_request(osb, OCFS2_I(inode)->ip_blkno,
945 inode->i_generation,
946 OCFS2_VOTE_REQ_RENAME, 0);
947 if (request) {
948 ocfs2_setup_unlink_vote(request, dentry);
949
950 status = ocfs2_request_vote(inode, request, NULL);
951
952 kfree(request);
953 }
954 return status;
955}
956
957int ocfs2_request_mount_vote(struct ocfs2_super *osb)
958{
959 int status;
960 struct ocfs2_vote_msg *request = NULL;
961
962 request = ocfs2_new_vote_request(osb, 0ULL, 0,
963 OCFS2_VOTE_REQ_MOUNT, 0);
964 if (!request) {
965 status = -ENOMEM;
966 goto bail;
967 }
968
969 status = -EAGAIN;
970 while (status == -EAGAIN) {
971 if (!(osb->s_mount_opt & OCFS2_MOUNT_NOINTR) &&
972 signal_pending(current)) {
973 status = -ERESTARTSYS;
974 goto bail;
975 }
976
977 if (ocfs2_node_map_is_only(osb, &osb->mounted_map,
978 osb->node_num)) {
979 status = 0;
980 goto bail;
981 }
982
983 status = ocfs2_do_request_vote(osb, request, NULL);
984 }
985
986bail:
987 if (request)
988 kfree(request);
989
990 return status;
991}
992
993int ocfs2_request_umount_vote(struct ocfs2_super *osb)
994{
995 int status;
996 struct ocfs2_vote_msg *request = NULL;
997
998 request = ocfs2_new_vote_request(osb, 0ULL, 0,
999 OCFS2_VOTE_REQ_UMOUNT, 0);
1000 if (!request) {
1001 status = -ENOMEM;
1002 goto bail;
1003 }
1004
1005 status = -EAGAIN;
1006 while (status == -EAGAIN) {
1007 /* Do not check signals on this vote... We really want
1008 * this one to go all the way through. */
1009
1010 if (ocfs2_node_map_is_only(osb, &osb->mounted_map,
1011 osb->node_num)) {
1012 status = 0;
1013 goto bail;
1014 }
1015
1016 status = ocfs2_do_request_vote(osb, request, NULL);
1017 }
1018
1019bail:
1020 if (request)
1021 kfree(request);
1022
1023 return status;
1024}
1025
1026/* TODO: This should eventually be a hash table! */
1027static struct ocfs2_net_wait_ctxt * __ocfs2_find_net_wait_ctxt(struct ocfs2_super *osb,
1028 u32 response_id)
1029{
1030 struct list_head *p;
1031 struct ocfs2_net_wait_ctxt *w = NULL;
1032
1033 list_for_each(p, &osb->net_response_list) {
1034 w = list_entry(p, struct ocfs2_net_wait_ctxt, n_list);
1035 if (response_id == w->n_response_id)
1036 break;
1037 w = NULL;
1038 }
1039
1040 return w;
1041}
1042
1043/* Translate response codes into local node errno values */
1044static inline int ocfs2_translate_response(int response)
1045{
1046 int ret;
1047
1048 switch (response) {
1049 case OCFS2_RESPONSE_OK:
1050 ret = 0;
1051 break;
1052
1053 case OCFS2_RESPONSE_BUSY:
1054 ret = -EBUSY;
1055 break;
1056
1057 default:
1058 ret = -EINVAL;
1059 }
1060
1061 return ret;
1062}
1063
1064static int ocfs2_handle_response_message(struct o2net_msg *msg,
1065 u32 len,
1066 void *data)
1067{
1068 unsigned int response_id, node_num;
1069 int response_status;
1070 struct ocfs2_super *osb = data;
1071 struct ocfs2_response_msg *resp;
1072 struct ocfs2_net_wait_ctxt * w;
1073 struct ocfs2_net_response_cb *resp_cb;
1074
1075 resp = (struct ocfs2_response_msg *) msg->buf;
1076
1077 response_id = be32_to_cpu(resp->r_hdr.h_response_id);
1078 node_num = be32_to_cpu(resp->r_hdr.h_node_num);
1079 response_status =
1080 ocfs2_translate_response(be32_to_cpu(resp->r_response));
1081
1082 mlog(0, "received response message:\n");
1083 mlog(0, "h_response_id = %u\n", response_id);
1084 mlog(0, "h_request = %u\n", be32_to_cpu(resp->r_hdr.h_request));
1085 mlog(0, "h_blkno = %"MLFu64"\n", be64_to_cpu(resp->r_hdr.h_blkno));
1086 mlog(0, "h_generation = %u\n", be32_to_cpu(resp->r_hdr.h_generation));
1087 mlog(0, "h_node_num = %u\n", node_num);
1088 mlog(0, "r_response = %d\n", response_status);
1089
1090 spin_lock(&osb->net_response_lock);
1091 w = __ocfs2_find_net_wait_ctxt(osb, response_id);
1092 if (!w) {
1093 mlog(0, "request not found!\n");
1094 goto bail;
1095 }
1096 resp_cb = w->n_callback;
1097
1098 if (response_status && (!w->n_response)) {
1099 /* we only really need one negative response so don't
1100 * set it twice. */
1101 w->n_response = response_status;
1102 }
1103
1104 if (resp_cb) {
1105 spin_unlock(&osb->net_response_lock);
1106
1107 resp_cb->rc_cb(resp_cb->rc_priv, resp);
1108
1109 spin_lock(&osb->net_response_lock);
1110 }
1111
1112 __ocfs2_mark_node_responded(osb, w, node_num);
1113bail:
1114 spin_unlock(&osb->net_response_lock);
1115
1116 return 0;
1117}
1118
1119static int ocfs2_handle_vote_message(struct o2net_msg *msg,
1120 u32 len,
1121 void *data)
1122{
1123 int status;
1124 struct ocfs2_super *osb = data;
1125 struct ocfs2_vote_work *work;
1126
1127 work = kmalloc(sizeof(struct ocfs2_vote_work), GFP_KERNEL);
1128 if (!work) {
1129 status = -ENOMEM;
1130 mlog_errno(status);
1131 goto bail;
1132 }
1133
1134 INIT_LIST_HEAD(&work->w_list);
1135 memcpy(&work->w_msg, msg->buf, sizeof(struct ocfs2_vote_msg));
1136
1137 mlog(0, "scheduling vote request:\n");
1138 mlog(0, "h_response_id = %u\n",
1139 be32_to_cpu(work->w_msg.v_hdr.h_response_id));
1140 mlog(0, "h_request = %u\n", be32_to_cpu(work->w_msg.v_hdr.h_request));
1141 mlog(0, "h_blkno = %"MLFu64"\n",
1142 be64_to_cpu(work->w_msg.v_hdr.h_blkno));
1143 mlog(0, "h_generation = %u\n",
1144 be32_to_cpu(work->w_msg.v_hdr.h_generation));
1145 mlog(0, "h_node_num = %u\n",
1146 be32_to_cpu(work->w_msg.v_hdr.h_node_num));
1147 mlog(0, "v_generic1 = %u\n", be32_to_cpu(work->w_msg.md1.v_generic1));
1148
1149 spin_lock(&osb->vote_task_lock);
1150 list_add_tail(&work->w_list, &osb->vote_list);
1151 osb->vote_count++;
1152 spin_unlock(&osb->vote_task_lock);
1153
1154 ocfs2_kick_vote_thread(osb);
1155
1156 status = 0;
1157bail:
1158 return status;
1159}
1160
1161void ocfs2_unregister_net_handlers(struct ocfs2_super *osb)
1162{
1163 if (!osb->net_key)
1164 return;
1165
1166 o2net_unregister_handler_list(&osb->osb_net_handlers);
1167
1168 if (!list_empty(&osb->net_response_list))
1169 mlog(ML_ERROR, "net response list not empty!\n");
1170
1171 osb->net_key = 0;
1172}
1173
1174int ocfs2_register_net_handlers(struct ocfs2_super *osb)
1175{
1176 int status = 0;
1177
1178 status = o2net_register_handler(OCFS2_MESSAGE_TYPE_RESPONSE,
1179 osb->net_key,
1180 sizeof(struct ocfs2_response_msg),
1181 ocfs2_handle_response_message,
1182 osb, &osb->osb_net_handlers);
1183 if (status) {
1184 mlog_errno(status);
1185 goto bail;
1186 }
1187
1188 status = o2net_register_handler(OCFS2_MESSAGE_TYPE_VOTE,
1189 osb->net_key,
1190 sizeof(struct ocfs2_vote_msg),
1191 ocfs2_handle_vote_message,
1192 osb, &osb->osb_net_handlers);
1193 if (status) {
1194 mlog_errno(status);
1195 goto bail;
1196 }
1197bail:
1198 if (status < 0)
1199 ocfs2_unregister_net_handlers(osb);
1200
1201 return status;
1202}