Merge branch 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/mfasheh/ocfs2

* 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/mfasheh/ocfs2: (22 commits)
  configfs: Zero terminate data in configfs attribute writes.
  [PATCH] ocfs2 heartbeat: clean up bio submission code
  ocfs2: introduce sc->sc_send_lock to protect outbound outbound messages
  [PATCH] ocfs2: drop INET from Kconfig, not needed
  ocfs2_dlm: Add timeout to dlm join domain
  ocfs2_dlm: Silence some messages during join domain
  ocfs2_dlm: disallow a domain join if node maps mismatch
  ocfs2_dlm: Ensure correct ordering of set/clear refmap bit on lockres
  ocfs2: Binds listener to the configured ip address
  ocfs2_dlm: Calling post handler function in assert master handler
  ocfs2: Added post handler callable function in o2net message handler
  ocfs2_dlm: Cookies in locks not being printed correctly in error messages
  ocfs2_dlm: Silence a failed convert
  ocfs2_dlm: wake up sleepers on the lockres waitqueue
  ocfs2_dlm: Dlm dispatch was stopping too early
  ocfs2_dlm: Drop inflight refmap even if no locks found on the lockres
  ocfs2_dlm: Flush dlm workqueue before starting to migrate
  ocfs2_dlm: Fix migrate lockres handler queue scanning
  ocfs2_dlm: Make dlmunlock() wait for migration to complete
  ocfs2_dlm: Fixes race between migrate and dirty
  ...
diff --git a/fs/Kconfig b/fs/Kconfig
index 8cd2417..5e8e9d9 100644
--- a/fs/Kconfig
+++ b/fs/Kconfig
@@ -426,7 +426,6 @@
 	select CONFIGFS_FS
 	select JBD
 	select CRC32
-	select INET
 	help
 	  OCFS2 is a general purpose extent based shared disk cluster file
 	  system with many similarities to ext3. It supports 64 bit inode
diff --git a/fs/configfs/file.c b/fs/configfs/file.c
index 2a7cb08..d98be5e 100644
--- a/fs/configfs/file.c
+++ b/fs/configfs/file.c
@@ -162,14 +162,17 @@
 	int error;
 
 	if (!buffer->page)
-		buffer->page = (char *)get_zeroed_page(GFP_KERNEL);
+		buffer->page = (char *)__get_free_pages(GFP_KERNEL, 0);
 	if (!buffer->page)
 		return -ENOMEM;
 
-	if (count > PAGE_SIZE)
-		count = PAGE_SIZE;
+	if (count >= PAGE_SIZE)
+		count = PAGE_SIZE - 1;
 	error = copy_from_user(buffer->page,buf,count);
 	buffer->needs_read_fill = 1;
+	/* if buf is assumed to contain a string, terminate it by \0,
+	 * so e.g. sscanf() can scan the string easily */
+	buffer->page[count] = 0;
 	return error ? -EFAULT : count;
 }
 
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index 277ca67..5a9779b 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -184,10 +184,9 @@
 	flush_scheduled_work();
 }
 
-static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,
-				      unsigned int num_ios)
+static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
 {
-	atomic_set(&wc->wc_num_reqs, num_ios);
+	atomic_set(&wc->wc_num_reqs, 1);
 	init_completion(&wc->wc_io_complete);
 	wc->wc_error = 0;
 }
@@ -212,6 +211,7 @@
 	struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
 
 	blk_run_address_space(mapping);
+	o2hb_bio_wait_dec(wc, 1);
 
 	wait_for_completion(&wc->wc_io_complete);
 }
@@ -231,6 +231,7 @@
 		return 1;
 
 	o2hb_bio_wait_dec(wc, 1);
+	bio_put(bio);
 	return 0;
 }
 
@@ -238,23 +239,22 @@
  * start_slot. */
 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
 				      struct o2hb_bio_wait_ctxt *wc,
-				      unsigned int start_slot,
-				      unsigned int num_slots)
+				      unsigned int *current_slot,
+				      unsigned int max_slots)
 {
-	int i, nr_vecs, len, first_page, last_page;
+	int len, current_page;
 	unsigned int vec_len, vec_start;
 	unsigned int bits = reg->hr_block_bits;
 	unsigned int spp = reg->hr_slots_per_page;
+	unsigned int cs = *current_slot;
 	struct bio *bio;
 	struct page *page;
 
-	nr_vecs = (num_slots + spp - 1) / spp;
-
 	/* Testing has shown this allocation to take long enough under
 	 * GFP_KERNEL that the local node can get fenced. It would be
 	 * nicest if we could pre-allocate these bios and avoid this
 	 * all together. */
-	bio = bio_alloc(GFP_ATOMIC, nr_vecs);
+	bio = bio_alloc(GFP_ATOMIC, 16);
 	if (!bio) {
 		mlog(ML_ERROR, "Could not alloc slots BIO!\n");
 		bio = ERR_PTR(-ENOMEM);
@@ -262,137 +262,53 @@
 	}
 
 	/* Must put everything in 512 byte sectors for the bio... */
-	bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9);
+	bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
 	bio->bi_bdev = reg->hr_bdev;
 	bio->bi_private = wc;
 	bio->bi_end_io = o2hb_bio_end_io;
 
-	first_page = start_slot / spp;
-	last_page = first_page + nr_vecs;
-	vec_start = (start_slot << bits) % PAGE_CACHE_SIZE;
-	for(i = first_page; i < last_page; i++) {
-		page = reg->hr_slot_data[i];
+	vec_start = (cs << bits) % PAGE_CACHE_SIZE;
+	while(cs < max_slots) {
+		current_page = cs / spp;
+		page = reg->hr_slot_data[current_page];
 
-		vec_len = PAGE_CACHE_SIZE;
-		/* last page might be short */
-		if (((i + 1) * spp) > (start_slot + num_slots))
-			vec_len = ((num_slots + start_slot) % spp) << bits;
-		vec_len -=  vec_start;
+		vec_len = min(PAGE_CACHE_SIZE,
+			      (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
 
 		mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
-		     i, vec_len, vec_start);
+		     current_page, vec_len, vec_start);
 
 		len = bio_add_page(bio, page, vec_len, vec_start);
-		if (len != vec_len) {
-			bio_put(bio);
-			bio = ERR_PTR(-EIO);
+		if (len != vec_len) break;
 
-			mlog(ML_ERROR, "Error adding page to bio i = %d, "
-			     "vec_len = %u, len = %d\n, start = %u\n",
-			     i, vec_len, len, vec_start);
-			goto bail;
-		}
-
+		cs += vec_len / (PAGE_CACHE_SIZE/spp);
 		vec_start = 0;
 	}
 
 bail:
+	*current_slot = cs;
 	return bio;
 }
 
-/*
- * Compute the maximum number of sectors the bdev can handle in one bio,
- * as a power of two.
- *
- * Stolen from oracleasm, thanks Joel!
- */
-static int compute_max_sectors(struct block_device *bdev)
-{
-	int max_pages, max_sectors, pow_two_sectors;
-
-	struct request_queue *q;
-
-	q = bdev_get_queue(bdev);
-	max_pages = q->max_sectors >> (PAGE_SHIFT - 9);
-	if (max_pages > BIO_MAX_PAGES)
-		max_pages = BIO_MAX_PAGES;
-	if (max_pages > q->max_phys_segments)
-		max_pages = q->max_phys_segments;
-	if (max_pages > q->max_hw_segments)
-		max_pages = q->max_hw_segments;
-	max_pages--; /* Handle I/Os that straddle a page */
-
-	if (max_pages) {
-		max_sectors = max_pages << (PAGE_SHIFT - 9);
-	} else {
-		/* If BIO contains 1 or less than 1 page. */
-		max_sectors = q->max_sectors;
-	}
-	/* Why is fls() 1-based???? */
-	pow_two_sectors = 1 << (fls(max_sectors) - 1);
-
-	return pow_two_sectors;
-}
-
-static inline void o2hb_compute_request_limits(struct o2hb_region *reg,
-					       unsigned int num_slots,
-					       unsigned int *num_bios,
-					       unsigned int *slots_per_bio)
-{
-	unsigned int max_sectors, io_sectors;
-
-	max_sectors = compute_max_sectors(reg->hr_bdev);
-
-	io_sectors = num_slots << (reg->hr_block_bits - 9);
-
-	*num_bios = (io_sectors + max_sectors - 1) / max_sectors;
-	*slots_per_bio = max_sectors >> (reg->hr_block_bits - 9);
-
-	mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This "
-	     "device can handle %u sectors of I/O\n", io_sectors, num_slots,
-	     max_sectors);
-	mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n",
-	     *num_bios, *slots_per_bio);
-}
-
 static int o2hb_read_slots(struct o2hb_region *reg,
 			   unsigned int max_slots)
 {
-	unsigned int num_bios, slots_per_bio, start_slot, num_slots;
-	int i, status;
+	unsigned int current_slot=0;
+	int status;
 	struct o2hb_bio_wait_ctxt wc;
-	struct bio **bios;
 	struct bio *bio;
 
-	o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio);
+	o2hb_bio_wait_init(&wc);
 
-	bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL);
-	if (!bios) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		return status;
-	}
-
-	o2hb_bio_wait_init(&wc, num_bios);
-
-	num_slots = slots_per_bio;
-	for(i = 0; i < num_bios; i++) {
-		start_slot = i * slots_per_bio;
-
-		/* adjust num_slots at last bio */
-		if (max_slots < (start_slot + num_slots))
-			num_slots = max_slots - start_slot;
-
-		bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots);
+	while(current_slot < max_slots) {
+		bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
 		if (IS_ERR(bio)) {
-			o2hb_bio_wait_dec(&wc, num_bios - i);
-
 			status = PTR_ERR(bio);
 			mlog_errno(status);
 			goto bail_and_wait;
 		}
-		bios[i] = bio;
 
+		atomic_inc(&wc.wc_num_reqs);
 		submit_bio(READ, bio);
 	}
 
@@ -403,38 +319,30 @@
 	if (wc.wc_error && !status)
 		status = wc.wc_error;
 
-	if (bios) {
-		for(i = 0; i < num_bios; i++)
-			if (bios[i])
-				bio_put(bios[i]);
-		kfree(bios);
-	}
-
 	return status;
 }
 
 static int o2hb_issue_node_write(struct o2hb_region *reg,
-				 struct bio **write_bio,
 				 struct o2hb_bio_wait_ctxt *write_wc)
 {
 	int status;
 	unsigned int slot;
 	struct bio *bio;
 
-	o2hb_bio_wait_init(write_wc, 1);
+	o2hb_bio_wait_init(write_wc);
 
 	slot = o2nm_this_node();
 
-	bio = o2hb_setup_one_bio(reg, write_wc, slot, 1);
+	bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
 	if (IS_ERR(bio)) {
 		status = PTR_ERR(bio);
 		mlog_errno(status);
 		goto bail;
 	}
 
+	atomic_inc(&write_wc->wc_num_reqs);
 	submit_bio(WRITE, bio);
 
-	*write_bio = bio;
 	status = 0;
 bail:
 	return status;
@@ -826,7 +734,6 @@
 {
 	int i, ret, highest_node, change = 0;
 	unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
-	struct bio *write_bio;
 	struct o2hb_bio_wait_ctxt write_wc;
 
 	ret = o2nm_configured_node_map(configured_nodes,
@@ -864,7 +771,7 @@
 
 	/* And fire off the write. Note that we don't wait on this I/O
 	 * until later. */
-	ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
+	ret = o2hb_issue_node_write(reg, &write_wc);
 	if (ret < 0) {
 		mlog_errno(ret);
 		return ret;
@@ -882,7 +789,6 @@
 	 * people we find in our steady state have seen us.
 	 */
 	o2hb_wait_on_io(reg, &write_wc);
-	bio_put(write_bio);
 	if (write_wc.wc_error) {
 		/* Do not re-arm the write timeout on I/O error - we
 		 * can't be sure that the new block ever made it to
@@ -943,7 +849,6 @@
 {
 	int i, ret;
 	struct o2hb_region *reg = data;
-	struct bio *write_bio;
 	struct o2hb_bio_wait_ctxt write_wc;
 	struct timeval before_hb, after_hb;
 	unsigned int elapsed_msec;
@@ -993,10 +898,9 @@
 	 *
 	 * XXX: Should we skip this on unclean_stop? */
 	o2hb_prepare_block(reg, 0);
-	ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
+	ret = o2hb_issue_node_write(reg, &write_wc);
 	if (ret == 0) {
 		o2hb_wait_on_io(reg, &write_wc);
-		bio_put(write_bio);
 	} else {
 		mlog_errno(ret);
 	}
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index ae4ff4a..1718215 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -556,6 +556,8 @@
 	sk->sk_data_ready = o2net_data_ready;
 	sk->sk_state_change = o2net_state_change;
 
+	mutex_init(&sc->sc_send_lock);
+
 	write_unlock_bh(&sk->sk_callback_lock);
 }
 
@@ -688,6 +690,7 @@
  * be given to the handler if their payload is longer than the max. */
 int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
 			   o2net_msg_handler_func *func, void *data,
+			   o2net_post_msg_handler_func *post_func,
 			   struct list_head *unreg_list)
 {
 	struct o2net_msg_handler *nmh = NULL;
@@ -722,6 +725,7 @@
 
 	nmh->nh_func = func;
 	nmh->nh_func_data = data;
+	nmh->nh_post_func = post_func;
 	nmh->nh_msg_type = msg_type;
 	nmh->nh_max_len = max_len;
 	nmh->nh_key = key;
@@ -856,10 +860,12 @@
 	ssize_t ret;
 
 
+	mutex_lock(&sc->sc_send_lock);
 	ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
 					 virt_to_page(kmalloced_virt),
 					 (long)kmalloced_virt & ~PAGE_MASK,
 					 size, MSG_DONTWAIT);
+	mutex_unlock(&sc->sc_send_lock);
 	if (ret != size) {
 		mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT 
 		     " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret);
@@ -974,8 +980,10 @@
 
 	/* finally, convert the message header to network byte-order
 	 * and send */
+	mutex_lock(&sc->sc_send_lock);
 	ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen,
 				 sizeof(struct o2net_msg) + caller_bytes);
+	mutex_unlock(&sc->sc_send_lock);
 	msglog(msg, "sending returned %d\n", ret);
 	if (ret < 0) {
 		mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret);
@@ -1049,6 +1057,7 @@
 	int ret = 0, handler_status;
 	enum  o2net_system_error syserr;
 	struct o2net_msg_handler *nmh = NULL;
+	void *ret_data = NULL;
 
 	msglog(hdr, "processing message\n");
 
@@ -1101,17 +1110,26 @@
 	sc->sc_msg_type = be16_to_cpu(hdr->msg_type);
 	handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) +
 					     be16_to_cpu(hdr->data_len),
-					nmh->nh_func_data);
+					nmh->nh_func_data, &ret_data);
 	do_gettimeofday(&sc->sc_tv_func_stop);
 
 out_respond:
 	/* this destroys the hdr, so don't use it after this */
+	mutex_lock(&sc->sc_send_lock);
 	ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr,
 				      handler_status);
+	mutex_unlock(&sc->sc_send_lock);
 	hdr = NULL;
 	mlog(0, "sending handler status %d, syserr %d returned %d\n",
 	     handler_status, syserr, ret);
 
+	if (nmh) {
+		BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL);
+		if (nmh->nh_post_func)
+			(nmh->nh_post_func)(handler_status, nmh->nh_func_data,
+					    ret_data);
+	}
+
 out:
 	if (nmh)
 		o2net_handler_put(nmh);
@@ -1795,13 +1813,13 @@
 	ready(sk, bytes);
 }
 
-static int o2net_open_listening_sock(__be16 port)
+static int o2net_open_listening_sock(__be32 addr, __be16 port)
 {
 	struct socket *sock = NULL;
 	int ret;
 	struct sockaddr_in sin = {
 		.sin_family = PF_INET,
-		.sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) },
+		.sin_addr = { .s_addr = (__force u32)addr },
 		.sin_port = (__force u16)port,
 	};
 
@@ -1824,15 +1842,15 @@
 	sock->sk->sk_reuse = 1;
 	ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
 	if (ret < 0) {
-		mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n",
-		     ntohs(port), ret);
+		mlog(ML_ERROR, "unable to bind socket at %u.%u.%u.%u:%u, "
+		     "ret=%d\n", NIPQUAD(addr), ntohs(port), ret);
 		goto out;
 	}
 
 	ret = sock->ops->listen(sock, 64);
 	if (ret < 0) {
-		mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n",
-		     ntohs(port), ret);
+		mlog(ML_ERROR, "unable to listen on %u.%u.%u.%u:%u, ret=%d\n",
+		     NIPQUAD(addr), ntohs(port), ret);
 	}
 
 out:
@@ -1865,7 +1883,8 @@
 		return -ENOMEM; /* ? */
 	}
 
-	ret = o2net_open_listening_sock(node->nd_ipv4_port);
+	ret = o2net_open_listening_sock(node->nd_ipv4_address,
+					node->nd_ipv4_port);
 	if (ret) {
 		destroy_workqueue(o2net_wq);
 		o2net_wq = NULL;
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index 21a4e43..da880fc 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -50,7 +50,10 @@
 	__u8  buf[0];
 };
 
-typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data);
+typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data,
+				     void **ret_data);
+typedef void (o2net_post_msg_handler_func)(int status, void *data,
+					   void *ret_data);
 
 #define O2NET_MAX_PAYLOAD_BYTES  (4096 - sizeof(struct o2net_msg))
 
@@ -99,6 +102,7 @@
 
 int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
 			   o2net_msg_handler_func *func, void *data,
+			   o2net_post_msg_handler_func *post_func,
 			   struct list_head *unreg_list);
 void o2net_unregister_handler_list(struct list_head *list);
 
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index b700dc9..4dae5df 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -38,6 +38,12 @@
  * locking semantics of the file system using the protocol.  It should 
  * be somewhere else, I'm sure, but right now it isn't.
  *
+ * New in version 7:
+ * 	- DLM join domain includes the live nodemap
+ *
+ * New in version 6:
+ * 	- DLM lockres remote refcount fixes.
+ *
  * New in version 5:
  * 	- Network timeout checking protocol
  *
@@ -51,7 +57,7 @@
  * 	- full 64 bit i_size in the metadata lock lvbs
  * 	- introduction of "rw" lock and pushing meta/data locking down
  */
-#define O2NET_PROTOCOL_VERSION 5ULL
+#define O2NET_PROTOCOL_VERSION 7ULL
 struct o2net_handshake {
 	__be64	protocol_version;
 	__be64	connector_id;
@@ -149,6 +155,8 @@
 	struct timeval 		sc_tv_func_stop;
 	u32			sc_msg_key;
 	u16			sc_msg_type;
+
+	struct mutex		sc_send_lock;
 };
 
 struct o2net_msg_handler {
@@ -158,6 +166,8 @@
 	u32			nh_key;
 	o2net_msg_handler_func	*nh_func;
 	o2net_msg_handler_func	*nh_func_data;
+	o2net_post_msg_handler_func
+				*nh_post_func;
 	struct kref		nh_kref;
 	struct list_head	nh_unregister_item;
 };
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index 681046d..241cad3 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -263,7 +263,8 @@
 
 
 
-int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
+			  void **ret_data)
 {
 	int ret;
 	unsigned int locklen;
@@ -311,8 +312,8 @@
 	    past->type != DLM_BAST) {
 		mlog(ML_ERROR, "Unknown ast type! %d, cookie=%u:%llu"
 		     "name=%.*s\n", past->type, 
-		     dlm_get_lock_cookie_node(cookie),
-		     dlm_get_lock_cookie_seq(cookie),
+		     dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
 		     locklen, name);
 		ret = DLM_IVLOCKID;
 		goto leave;
@@ -323,8 +324,8 @@
 		mlog(0, "got %sast for unknown lockres! "
 		     "cookie=%u:%llu, name=%.*s, namelen=%u\n",
 		     past->type == DLM_AST ? "" : "b",
-		     dlm_get_lock_cookie_node(cookie),
-		     dlm_get_lock_cookie_seq(cookie),
+		     dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
 		     locklen, name, locklen);
 		ret = DLM_IVLOCKID;
 		goto leave;
@@ -369,7 +370,8 @@
 
 	mlog(0, "got %sast for unknown lock!  cookie=%u:%llu, "
 	     "name=%.*s, namelen=%u\n", past->type == DLM_AST ? "" : "b", 
-	     dlm_get_lock_cookie_node(cookie), dlm_get_lock_cookie_seq(cookie),
+	     dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
+	     dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
 	     locklen, name, locklen);
 
 	ret = DLM_NORMAL;
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 6b6ff76..e90b92f 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -180,6 +180,11 @@
 	unsigned ignore_higher:1;
 };
 
+struct dlm_deref_lockres_priv
+{
+	struct dlm_lock_resource *deref_res;
+	u8 deref_node;
+};
 
 struct dlm_work_item
 {
@@ -191,6 +196,7 @@
 		struct dlm_request_all_locks_priv ral;
 		struct dlm_mig_lockres_priv ml;
 		struct dlm_assert_master_priv am;
+		struct dlm_deref_lockres_priv dl;
 	} u;
 };
 
@@ -222,6 +228,9 @@
 #define DLM_LOCK_RES_DIRTY                0x00000008
 #define DLM_LOCK_RES_IN_PROGRESS          0x00000010
 #define DLM_LOCK_RES_MIGRATING            0x00000020
+#define DLM_LOCK_RES_DROPPING_REF         0x00000040
+#define DLM_LOCK_RES_BLOCK_DIRTY          0x00001000
+#define DLM_LOCK_RES_SETREF_INPROG        0x00002000
 
 /* max milliseconds to wait to sync up a network failure with a node death */
 #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
@@ -265,6 +274,8 @@
 	u8  owner;              //node which owns the lock resource, or unknown
 	u16 state;
 	char lvb[DLM_LVB_LEN];
+	unsigned int inflight_locks;
+	unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 };
 
 struct dlm_migratable_lock
@@ -367,7 +378,7 @@
 	DLM_CONVERT_LOCK_MSG,	 /* 504 */
 	DLM_PROXY_AST_MSG,	 /* 505 */
 	DLM_UNLOCK_LOCK_MSG,	 /* 506 */
-	DLM_UNUSED_MSG2,	 /* 507 */
+	DLM_DEREF_LOCKRES_MSG,	 /* 507 */
 	DLM_MIGRATE_REQUEST_MSG, /* 508 */
 	DLM_MIG_LOCKRES_MSG, 	 /* 509 */
 	DLM_QUERY_JOIN_MSG,	 /* 510 */
@@ -417,6 +428,9 @@
 	u8 name[O2NM_MAX_NAME_LEN];
 };
 
+#define DLM_ASSERT_RESPONSE_REASSERT       0x00000001
+#define DLM_ASSERT_RESPONSE_MASTERY_REF    0x00000002
+
 #define DLM_ASSERT_MASTER_MLE_CLEANUP      0x00000001
 #define DLM_ASSERT_MASTER_REQUERY          0x00000002
 #define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
@@ -430,6 +444,8 @@
 	u8 name[O2NM_MAX_NAME_LEN];
 };
 
+#define DLM_MIGRATE_RESPONSE_MASTERY_REF   0x00000001
+
 struct dlm_migrate_request
 {
 	u8 master;
@@ -609,12 +625,16 @@
 };
 
 
+#define BITS_PER_BYTE 8
+#define BITS_TO_BYTES(bits) (((bits)+BITS_PER_BYTE-1)/BITS_PER_BYTE)
+
 struct dlm_query_join_request
 {
 	u8 node_idx;
 	u8 pad1[2];
 	u8 name_len;
 	u8 domain[O2NM_MAX_NAME_LEN];
+	u8 node_map[BITS_TO_BYTES(O2NM_MAX_NODES)];
 };
 
 struct dlm_assert_joined
@@ -648,6 +668,16 @@
 	__be32 pad2;
 };
 
+struct dlm_deref_lockres
+{
+	u32 pad1;
+	u16 pad2;
+	u8 node_idx;
+	u8 namelen;
+
+	u8 name[O2NM_MAX_NAME_LEN];
+};
+
 static inline enum dlm_status
 __dlm_lockres_state_to_status(struct dlm_lock_resource *res)
 {
@@ -688,16 +718,20 @@
 void dlm_lock_attach_lockres(struct dlm_lock *lock,
 			     struct dlm_lock_resource *res);
 
-int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+			    void **ret_data);
+int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+			     void **ret_data);
+int dlm_proxy_ast_handler(struct o2net_msg *msg, u32 len, void *data,
+			  void **ret_data);
 
 void dlm_revert_pending_convert(struct dlm_lock_resource *res,
 				struct dlm_lock *lock);
 void dlm_revert_pending_lock(struct dlm_lock_resource *res,
 			     struct dlm_lock *lock);
 
-int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+			    void **ret_data);
 void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
 			       struct dlm_lock *lock);
 void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
@@ -721,8 +755,6 @@
 			      struct dlm_lock_resource *res);
 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
 			    struct dlm_lock_resource *res);
-void dlm_purge_lockres(struct dlm_ctxt *dlm,
-		       struct dlm_lock_resource *lockres);
 static inline void dlm_lockres_get(struct dlm_lock_resource *res)
 {
 	/* This is called on every lookup, so it might be worth
@@ -733,6 +765,10 @@
 void __dlm_unhash_lockres(struct dlm_lock_resource *res);
 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
 			  struct dlm_lock_resource *res);
+struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
+						     const char *name,
+						     unsigned int len,
+						     unsigned int hash);
 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
 						const char *name,
 						unsigned int len,
@@ -753,6 +789,47 @@
 					  const char *name,
 					  unsigned int namelen);
 
+#define dlm_lockres_set_refmap_bit(bit,res)  \
+	__dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
+#define dlm_lockres_clear_refmap_bit(bit,res)  \
+	__dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
+
+static inline void __dlm_lockres_set_refmap_bit(int bit,
+						struct dlm_lock_resource *res,
+						const char *file,
+						int line)
+{
+	//printk("%s:%d:%.*s: setting bit %d\n", file, line,
+	//     res->lockname.len, res->lockname.name, bit);
+	set_bit(bit, res->refmap);
+}
+
+static inline void __dlm_lockres_clear_refmap_bit(int bit,
+						  struct dlm_lock_resource *res,
+						  const char *file,
+						  int line)
+{
+	//printk("%s:%d:%.*s: clearing bit %d\n", file, line,
+	//     res->lockname.len, res->lockname.name, bit);
+	clear_bit(bit, res->refmap);
+}
+
+void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+				   struct dlm_lock_resource *res,
+				   const char *file,
+				   int line);
+void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+				   struct dlm_lock_resource *res,
+				   int new_lockres,
+				   const char *file,
+				   int line);
+#define dlm_lockres_drop_inflight_ref(d,r)  \
+	__dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
+#define dlm_lockres_grab_inflight_ref(d,r)  \
+	__dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
+#define dlm_lockres_grab_inflight_ref_new(d,r)  \
+	__dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
+
 void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 void dlm_do_local_ast(struct dlm_ctxt *dlm,
@@ -801,10 +878,7 @@
 void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data);
 void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data);
 
-int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
-int dlm_migrate_lockres(struct dlm_ctxt *dlm,
-			struct dlm_lock_resource *res,
-			u8 target);
+int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
 int dlm_finish_migration(struct dlm_ctxt *dlm,
 			 struct dlm_lock_resource *res,
 			 u8 old_master);
@@ -812,15 +886,27 @@
 			     struct dlm_lock_resource *res);
 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res);
 
-int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
-int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
+			       void **ret_data);
+int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
+			      void **ret_data);
+void dlm_assert_master_post_handler(int status, void *data, void *ret_data);
+int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+			      void **ret_data);
+int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
+				void **ret_data);
+int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+			    void **ret_data);
+int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
+			       void **ret_data);
+int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
+				  void **ret_data);
+int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
+			       void **ret_data);
+int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+			   void **ret_data);
+int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+			      void **ret_data);
 int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
 			  u8 nodenum, u8 *real_master);
 
@@ -856,10 +942,12 @@
 int dlm_init_mle_cache(void);
 void dlm_destroy_mle_cache(void);
 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
+int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
+			 struct dlm_lock_resource *res);
 void dlm_clean_master_list(struct dlm_ctxt *dlm,
 			   u8 dead_node);
 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
-
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
 int __dlm_lockres_unused(struct dlm_lock_resource *res);
 
 static inline const char * dlm_lock_mode_name(int mode)
diff --git a/fs/ocfs2/dlm/dlmconvert.c b/fs/ocfs2/dlm/dlmconvert.c
index c764dc8..ecb4d99 100644
--- a/fs/ocfs2/dlm/dlmconvert.c
+++ b/fs/ocfs2/dlm/dlmconvert.c
@@ -286,8 +286,8 @@
 		__dlm_print_one_lock_resource(res);
 		mlog(ML_ERROR, "converting a remote lock that is already "
 		     "converting! (cookie=%u:%llu, conv=%d)\n",
-		     dlm_get_lock_cookie_node(lock->ml.cookie),
-		     dlm_get_lock_cookie_seq(lock->ml.cookie),
+		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 		     lock->ml.convert_type);
 		status = DLM_DENIED;
 		goto bail;
@@ -418,7 +418,8 @@
  * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
  *          status from __dlmconvert_master
  */
-int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+			     void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
@@ -428,7 +429,7 @@
 	struct dlm_lockstatus *lksb;
 	enum dlm_status status = DLM_NORMAL;
 	u32 flags;
-	int call_ast = 0, kick_thread = 0, ast_reserved = 0;
+	int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
 
 	if (!dlm_grab(dlm)) {
 		dlm_error(DLM_REJECTED);
@@ -479,25 +480,14 @@
 		}
 		lock = NULL;
 	}
-	if (!lock) {
-		__dlm_print_one_lock_resource(res);
-		list_for_each(iter, &res->granted) {
-			lock = list_entry(iter, struct dlm_lock, list);
-			if (lock->ml.node == cnv->node_idx) {
-				mlog(ML_ERROR, "There is something here "
-				     "for node %u, lock->ml.cookie=%llu, "
-				     "cnv->cookie=%llu\n", cnv->node_idx,
-				     (unsigned long long)lock->ml.cookie,
-				     (unsigned long long)cnv->cookie);
-				break;
-			}
-		}
-		lock = NULL;
-	}
 	spin_unlock(&res->spinlock);
 	if (!lock) {
 		status = DLM_IVLOCKID;
-		dlm_error(status);
+		mlog(ML_ERROR, "did not find lock to convert on grant queue! "
+			       "cookie=%u:%llu\n",
+		     dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
+		__dlm_print_one_lock_resource(res);
 		goto leave;
 	}
 
@@ -524,8 +514,11 @@
 					     cnv->requested_type,
 					     &call_ast, &kick_thread);
 		res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
+		wake = 1;
 	}
 	spin_unlock(&res->spinlock);
+	if (wake)
+		wake_up(&res->wq);
 
 	if (status != DLM_NORMAL) {
 		if (status != DLM_NOTQUEUED)
@@ -534,12 +527,7 @@
 	}
 
 leave:
-	if (!lock)
-		mlog(ML_ERROR, "did not find lock to convert on grant queue! "
-			       "cookie=%u:%llu\n",
-			       dlm_get_lock_cookie_node(cnv->cookie),
-			       dlm_get_lock_cookie_seq(cnv->cookie));
-	else
+	if (lock)
 		dlm_lock_put(lock);
 
 	/* either queue the ast or release it, if reserved */
diff --git a/fs/ocfs2/dlm/dlmdebug.c b/fs/ocfs2/dlm/dlmdebug.c
index 3f6c8d8..64239b3 100644
--- a/fs/ocfs2/dlm/dlmdebug.c
+++ b/fs/ocfs2/dlm/dlmdebug.c
@@ -53,6 +53,23 @@
 	spin_unlock(&res->spinlock);
 }
 
+static void dlm_print_lockres_refmap(struct dlm_lock_resource *res)
+{
+	int bit;
+	assert_spin_locked(&res->spinlock);
+
+	mlog(ML_NOTICE, "  refmap nodes: [ ");
+	bit = 0;
+	while (1) {
+		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
+		if (bit >= O2NM_MAX_NODES)
+			break;
+		printk("%u ", bit);
+		bit++;
+	}
+	printk("], inflight=%u\n", res->inflight_locks);
+}
+
 void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
 {
 	struct list_head *iter2;
@@ -65,6 +82,7 @@
 	       res->owner, res->state);
 	mlog(ML_NOTICE, "  last used: %lu, on purge list: %s\n",
 	     res->last_used, list_empty(&res->purge) ? "no" : "yes");
+	dlm_print_lockres_refmap(res);
 	mlog(ML_NOTICE, "  granted queue: \n");
 	list_for_each(iter2, &res->granted) {
 		lock = list_entry(iter2, struct dlm_lock, list);
@@ -72,8 +90,8 @@
 		mlog(ML_NOTICE, "    type=%d, conv=%d, node=%u, "
 		       "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 
 		       lock->ml.type, lock->ml.convert_type, lock->ml.node, 
-		       dlm_get_lock_cookie_node(lock->ml.cookie), 
-		       dlm_get_lock_cookie_seq(lock->ml.cookie), 
+		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 		       list_empty(&lock->ast_list) ? 'y' : 'n',
 		       lock->ast_pending ? 'y' : 'n',
 		       list_empty(&lock->bast_list) ? 'y' : 'n',
@@ -87,8 +105,8 @@
 		mlog(ML_NOTICE, "    type=%d, conv=%d, node=%u, "
 		       "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 
 		       lock->ml.type, lock->ml.convert_type, lock->ml.node, 
-		       dlm_get_lock_cookie_node(lock->ml.cookie), 
-		       dlm_get_lock_cookie_seq(lock->ml.cookie), 
+		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 		       list_empty(&lock->ast_list) ? 'y' : 'n',
 		       lock->ast_pending ? 'y' : 'n',
 		       list_empty(&lock->bast_list) ? 'y' : 'n',
@@ -102,8 +120,8 @@
 		mlog(ML_NOTICE, "    type=%d, conv=%d, node=%u, "
 		       "cookie=%u:%llu, ast=(empty=%c,pend=%c), bast=(empty=%c,pend=%c)\n", 
 		       lock->ml.type, lock->ml.convert_type, lock->ml.node, 
-		       dlm_get_lock_cookie_node(lock->ml.cookie), 
-		       dlm_get_lock_cookie_seq(lock->ml.cookie), 
+		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 		       list_empty(&lock->ast_list) ? 'y' : 'n',
 		       lock->ast_pending ? 'y' : 'n',
 		       list_empty(&lock->bast_list) ? 'y' : 'n',
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index f0b25f2..6087c47 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -48,6 +48,36 @@
 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
 #include "cluster/masklog.h"
 
+/*
+ * ocfs2 node maps are array of long int, which limits to send them freely
+ * across the wire due to endianness issues. To workaround this, we convert
+ * long ints to byte arrays. Following 3 routines are helper functions to
+ * set/test/copy bits within those array of bytes
+ */
+static inline void byte_set_bit(u8 nr, u8 map[])
+{
+	map[nr >> 3] |= (1UL << (nr & 7));
+}
+
+static inline int byte_test_bit(u8 nr, u8 map[])
+{
+	return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
+}
+
+static inline void byte_copymap(u8 dmap[], unsigned long smap[],
+			unsigned int sz)
+{
+	unsigned int nn;
+
+	if (!sz)
+		return;
+
+	memset(dmap, 0, ((sz + 7) >> 3));
+	for (nn = 0 ; nn < sz; nn++)
+		if (test_bit(nn, smap))
+			byte_set_bit(nn, dmap);
+}
+
 static void dlm_free_pagevec(void **vec, int pages)
 {
 	while (pages--)
@@ -95,10 +125,14 @@
 
 #define DLM_DOMAIN_BACKOFF_MS 200
 
-static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data);
-static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data);
-static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data);
-static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data);
+static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
+				  void **ret_data);
+static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
+				     void **ret_data);
+static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
+				   void **ret_data);
+static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
+				   void **ret_data);
 
 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
 
@@ -125,10 +159,10 @@
 	hlist_add_head(&res->hash_node, bucket);
 }
 
-struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
-						const char *name,
-						unsigned int len,
-						unsigned int hash)
+struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
+						     const char *name,
+						     unsigned int len,
+						     unsigned int hash)
 {
 	struct hlist_head *bucket;
 	struct hlist_node *list;
@@ -154,6 +188,37 @@
 	return NULL;
 }
 
+/* intended to be called by functions which do not care about lock
+ * resources which are being purged (most net _handler functions).
+ * this will return NULL for any lock resource which is found but
+ * currently in the process of dropping its mastery reference.
+ * use __dlm_lookup_lockres_full when you need the lock resource
+ * regardless (e.g. dlm_get_lock_resource) */
+struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
+						const char *name,
+						unsigned int len,
+						unsigned int hash)
+{
+	struct dlm_lock_resource *res = NULL;
+
+	mlog_entry("%.*s\n", len, name);
+
+	assert_spin_locked(&dlm->spinlock);
+
+	res = __dlm_lookup_lockres_full(dlm, name, len, hash);
+	if (res) {
+		spin_lock(&res->spinlock);
+		if (res->state & DLM_LOCK_RES_DROPPING_REF) {
+			spin_unlock(&res->spinlock);
+			dlm_lockres_put(res);
+			return NULL;
+		}
+		spin_unlock(&res->spinlock);
+	}
+
+	return res;
+}
+
 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 				    const char *name,
 				    unsigned int len)
@@ -330,43 +395,60 @@
 	wake_up(&dlm_domain_events);
 }
 
-static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
+static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 {
-	int i;
+	int i, num, n, ret = 0;
 	struct dlm_lock_resource *res;
+	struct hlist_node *iter;
+	struct hlist_head *bucket;
+	int dropped;
 
 	mlog(0, "Migrating locks from domain %s\n", dlm->name);
-restart:
+
+	num = 0;
 	spin_lock(&dlm->spinlock);
 	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
-		while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
-			res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
-					  struct dlm_lock_resource, hash_node);
-			/* need reference when manually grabbing lockres */
+redo_bucket:
+		n = 0;
+		bucket = dlm_lockres_hash(dlm, i);
+		iter = bucket->first;
+		while (iter) {
+			n++;
+			res = hlist_entry(iter, struct dlm_lock_resource,
+					  hash_node);
 			dlm_lockres_get(res);
-			/* this should unhash the lockres
-			 * and exit with dlm->spinlock */
-			mlog(0, "purging res=%p\n", res);
-			if (dlm_lockres_is_dirty(dlm, res)) {
-				/* HACK!  this should absolutely go.
-				 * need to figure out why some empty
-				 * lockreses are still marked dirty */
-				mlog(ML_ERROR, "lockres %.*s dirty!\n",
-				     res->lockname.len, res->lockname.name);
+			/* migrate, if necessary.  this will drop the dlm
+			 * spinlock and retake it if it does migration. */
+			dropped = dlm_empty_lockres(dlm, res);
 
-				spin_unlock(&dlm->spinlock);
-				dlm_kick_thread(dlm, res);
-				wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
-				dlm_lockres_put(res);
-				goto restart;
-			}
-			dlm_purge_lockres(dlm, res);
+			spin_lock(&res->spinlock);
+			__dlm_lockres_calc_usage(dlm, res);
+			iter = res->hash_node.next;
+			spin_unlock(&res->spinlock);
+
 			dlm_lockres_put(res);
+
+			cond_resched_lock(&dlm->spinlock);
+
+			if (dropped)
+				goto redo_bucket;
 		}
+		num += n;
+		mlog(0, "%s: touched %d lockreses in bucket %d "
+		     "(tot=%d)\n", dlm->name, n, i, num);
 	}
 	spin_unlock(&dlm->spinlock);
+	wake_up(&dlm->dlm_thread_wq);
 
+	/* let the dlm thread take care of purging, keep scanning until
+	 * nothing remains in the hash */
+	if (num) {
+		mlog(0, "%s: %d lock resources in hash last pass\n",
+		     dlm->name, num);
+		ret = -EAGAIN;
+	}
 	mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
+	return ret;
 }
 
 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
@@ -418,7 +500,8 @@
 	printk("\n");
 }
 
-static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data)
+static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
+				   void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	unsigned int node;
@@ -571,7 +654,9 @@
 		/* We changed dlm state, notify the thread */
 		dlm_kick_thread(dlm, NULL);
 
-		dlm_migrate_all_locks(dlm);
+		while (dlm_migrate_all_locks(dlm)) {
+			mlog(0, "%s: more migration to do\n", dlm->name);
+		}
 		dlm_mark_domain_leaving(dlm);
 		dlm_leave_domain(dlm);
 		dlm_complete_dlm_shutdown(dlm);
@@ -580,11 +665,13 @@
 }
 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
 
-static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
+static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
+				  void **ret_data)
 {
 	struct dlm_query_join_request *query;
 	enum dlm_query_join_response response;
 	struct dlm_ctxt *dlm = NULL;
+	u8 nodenum;
 
 	query = (struct dlm_query_join_request *) msg->buf;
 
@@ -608,6 +695,28 @@
 
 	spin_lock(&dlm_domain_lock);
 	dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
+	if (!dlm)
+		goto unlock_respond;
+
+	/*
+	 * There is a small window where the joining node may not see the
+	 * node(s) that just left but still part of the cluster. DISALLOW
+	 * join request if joining node has different node map.
+	 */
+	nodenum=0;
+	while (nodenum < O2NM_MAX_NODES) {
+		if (test_bit(nodenum, dlm->domain_map)) {
+			if (!byte_test_bit(nodenum, query->node_map)) {
+				mlog(0, "disallow join as node %u does not "
+				     "have node %u in its nodemap\n",
+				     query->node_idx, nodenum);
+				response = JOIN_DISALLOW;
+				goto unlock_respond;
+			}
+		}
+		nodenum++;
+	}
+
 	/* Once the dlm ctxt is marked as leaving then we don't want
 	 * to be put in someone's domain map. 
 	 * Also, explicitly disallow joining at certain troublesome
@@ -626,15 +735,15 @@
 			/* Disallow parallel joins. */
 			response = JOIN_DISALLOW;
 		} else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
-			mlog(ML_NOTICE, "node %u trying to join, but recovery "
+			mlog(0, "node %u trying to join, but recovery "
 			     "is ongoing.\n", bit);
 			response = JOIN_DISALLOW;
 		} else if (test_bit(bit, dlm->recovery_map)) {
-			mlog(ML_NOTICE, "node %u trying to join, but it "
+			mlog(0, "node %u trying to join, but it "
 			     "still needs recovery.\n", bit);
 			response = JOIN_DISALLOW;
 		} else if (test_bit(bit, dlm->domain_map)) {
-			mlog(ML_NOTICE, "node %u trying to join, but it "
+			mlog(0, "node %u trying to join, but it "
 			     "is still in the domain! needs recovery?\n",
 			     bit);
 			response = JOIN_DISALLOW;
@@ -649,6 +758,7 @@
 
 		spin_unlock(&dlm->spinlock);
 	}
+unlock_respond:
 	spin_unlock(&dlm_domain_lock);
 
 respond:
@@ -657,7 +767,8 @@
 	return response;
 }
 
-static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data)
+static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
+				     void **ret_data)
 {
 	struct dlm_assert_joined *assert;
 	struct dlm_ctxt *dlm = NULL;
@@ -694,7 +805,8 @@
 	return 0;
 }
 
-static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data)
+static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
+				   void **ret_data)
 {
 	struct dlm_cancel_join *cancel;
 	struct dlm_ctxt *dlm = NULL;
@@ -796,6 +908,9 @@
 	join_msg.name_len = strlen(dlm->name);
 	memcpy(join_msg.domain, dlm->name, join_msg.name_len);
 
+	/* copy live node map to join message */
+	byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
+
 	status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
 				    sizeof(join_msg), node, &retval);
 	if (status < 0 && status != -ENOPROTOOPT) {
@@ -1036,98 +1151,106 @@
 	status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
 					sizeof(struct dlm_master_request),
 					dlm_master_request_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
 					sizeof(struct dlm_assert_master),
 					dlm_assert_master_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, dlm_assert_master_post_handler,
+					&dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
 					sizeof(struct dlm_create_lock),
 					dlm_create_lock_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
 					DLM_CONVERT_LOCK_MAX_LEN,
 					dlm_convert_lock_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
 					DLM_UNLOCK_LOCK_MAX_LEN,
 					dlm_unlock_lock_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
 					DLM_PROXY_AST_MAX_LEN,
 					dlm_proxy_ast_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
 					sizeof(struct dlm_exit_domain),
 					dlm_exit_domain_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
+	status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
+					sizeof(struct dlm_deref_lockres),
+					dlm_deref_lockres_handler,
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
 					sizeof(struct dlm_migrate_request),
 					dlm_migrate_request_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
 					DLM_MIG_LOCKRES_MAX_LEN,
 					dlm_mig_lockres_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
 					sizeof(struct dlm_master_requery),
 					dlm_master_requery_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
 					sizeof(struct dlm_lock_request),
 					dlm_request_all_locks_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
 					sizeof(struct dlm_reco_data_done),
 					dlm_reco_data_done_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
 					sizeof(struct dlm_begin_reco),
 					dlm_begin_reco_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
 					sizeof(struct dlm_finalize_reco),
 					dlm_finalize_reco_handler,
-					dlm, &dlm->dlm_domain_handlers);
+					dlm, NULL, &dlm->dlm_domain_handlers);
 	if (status)
 		goto bail;
 
@@ -1141,6 +1264,8 @@
 static int dlm_join_domain(struct dlm_ctxt *dlm)
 {
 	int status;
+	unsigned int backoff;
+	unsigned int total_backoff = 0;
 
 	BUG_ON(!dlm);
 
@@ -1172,18 +1297,27 @@
 	}
 
 	do {
-		unsigned int backoff;
 		status = dlm_try_to_join_domain(dlm);
 
 		/* If we're racing another node to the join, then we
 		 * need to back off temporarily and let them
 		 * complete. */
+#define	DLM_JOIN_TIMEOUT_MSECS	90000
 		if (status == -EAGAIN) {
 			if (signal_pending(current)) {
 				status = -ERESTARTSYS;
 				goto bail;
 			}
 
+			if (total_backoff >
+			    msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
+				status = -ERESTARTSYS;
+				mlog(ML_NOTICE, "Timed out joining dlm domain "
+				     "%s after %u msecs\n", dlm->name,
+				     jiffies_to_msecs(total_backoff));
+				goto bail;
+			}
+
 			/*
 			 * <chip> After you!
 			 * <dale> No, after you!
@@ -1193,6 +1327,7 @@
 			 */
 			backoff = (unsigned int)(jiffies & 0x3);
 			backoff *= DLM_DOMAIN_BACKOFF_MS;
+			total_backoff += backoff;
 			mlog(0, "backoff %d\n", backoff);
 			msleep(backoff);
 		}
@@ -1421,21 +1556,21 @@
 	status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
 					sizeof(struct dlm_query_join_request),
 					dlm_query_join_handler,
-					NULL, &dlm_join_handlers);
+					NULL, NULL, &dlm_join_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
 					sizeof(struct dlm_assert_joined),
 					dlm_assert_joined_handler,
-					NULL, &dlm_join_handlers);
+					NULL, NULL, &dlm_join_handlers);
 	if (status)
 		goto bail;
 
 	status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
 					sizeof(struct dlm_cancel_join),
 					dlm_cancel_join_handler,
-					NULL, &dlm_join_handlers);
+					NULL, NULL, &dlm_join_handlers);
 
 bail:
 	if (status < 0)
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index e5ca3db..52578d9 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -163,6 +163,10 @@
 			kick_thread = 1;
 		}
 	}
+	/* reduce the inflight count, this may result in the lockres
+	 * being purged below during calc_usage */
+	if (lock->ml.node == dlm->node_num)
+		dlm_lockres_drop_inflight_ref(dlm, res);
 
 	spin_unlock(&res->spinlock);
 	wake_up(&res->wq);
@@ -437,7 +441,8 @@
  *   held on exit:  none
  * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
  */
-int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+			    void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 0ad8720..77e4e61 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -99,9 +99,10 @@
 			    int idx);
 
 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
-static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
-				unsigned int namelen, void *nodemap,
-				u32 flags);
+static int dlm_do_assert_master(struct dlm_ctxt *dlm,
+				struct dlm_lock_resource *res,
+				void *nodemap, u32 flags);
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
 
 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
 				struct dlm_master_list_entry *mle,
@@ -237,7 +238,8 @@
 			struct dlm_master_list_entry **mle,
 			char *name, unsigned int namelen);
 
-static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
+static int dlm_do_master_request(struct dlm_lock_resource *res,
+				 struct dlm_master_list_entry *mle, int to);
 
 
 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
@@ -687,6 +689,7 @@
 	INIT_LIST_HEAD(&res->purge);
 	atomic_set(&res->asts_reserved, 0);
 	res->migration_pending = 0;
+	res->inflight_locks = 0;
 
 	kref_init(&res->refs);
 
@@ -700,6 +703,7 @@
 	res->last_used = 0;
 
 	memset(res->lvb, 0, DLM_LVB_LEN);
+	memset(res->refmap, 0, sizeof(res->refmap));
 }
 
 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
@@ -722,6 +726,42 @@
 	return res;
 }
 
+void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+				   struct dlm_lock_resource *res,
+				   int new_lockres,
+				   const char *file,
+				   int line)
+{
+	if (!new_lockres)
+		assert_spin_locked(&res->spinlock);
+
+	if (!test_bit(dlm->node_num, res->refmap)) {
+		BUG_ON(res->inflight_locks != 0);
+		dlm_lockres_set_refmap_bit(dlm->node_num, res);
+	}
+	res->inflight_locks++;
+	mlog(0, "%s:%.*s: inflight++: now %u\n",
+	     dlm->name, res->lockname.len, res->lockname.name,
+	     res->inflight_locks);
+}
+
+void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+				   struct dlm_lock_resource *res,
+				   const char *file,
+				   int line)
+{
+	assert_spin_locked(&res->spinlock);
+
+	BUG_ON(res->inflight_locks == 0);
+	res->inflight_locks--;
+	mlog(0, "%s:%.*s: inflight--: now %u\n",
+	     dlm->name, res->lockname.len, res->lockname.name,
+	     res->inflight_locks);
+	if (res->inflight_locks == 0)
+		dlm_lockres_clear_refmap_bit(dlm->node_num, res);
+	wake_up(&res->wq);
+}
+
 /*
  * lookup a lock resource by name.
  * may already exist in the hashtable.
@@ -752,6 +792,7 @@
 	unsigned int hash;
 	int tries = 0;
 	int bit, wait_on_recovery = 0;
+	int drop_inflight_if_nonlocal = 0;
 
 	BUG_ON(!lockid);
 
@@ -761,9 +802,30 @@
 
 lookup:
 	spin_lock(&dlm->spinlock);
-	tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
+	tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
 	if (tmpres) {
+		int dropping_ref = 0;
+
+		spin_lock(&tmpres->spinlock);
+		if (tmpres->owner == dlm->node_num) {
+			BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
+			dlm_lockres_grab_inflight_ref(dlm, tmpres);
+		} else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
+			dropping_ref = 1;
+		spin_unlock(&tmpres->spinlock);
 		spin_unlock(&dlm->spinlock);
+
+		/* wait until done messaging the master, drop our ref to allow
+		 * the lockres to be purged, start over. */
+		if (dropping_ref) {
+			spin_lock(&tmpres->spinlock);
+			__dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
+			spin_unlock(&tmpres->spinlock);
+			dlm_lockres_put(tmpres);
+			tmpres = NULL;
+			goto lookup;
+		}
+
 		mlog(0, "found in hash!\n");
 		if (res)
 			dlm_lockres_put(res);
@@ -793,6 +855,7 @@
 		spin_lock(&res->spinlock);
 		dlm_change_lockres_owner(dlm, res, dlm->node_num);
 		__dlm_insert_lockres(dlm, res);
+		dlm_lockres_grab_inflight_ref(dlm, res);
 		spin_unlock(&res->spinlock);
 		spin_unlock(&dlm->spinlock);
 		/* lockres still marked IN_PROGRESS */
@@ -805,29 +868,40 @@
 	/* if we found a block, wait for lock to be mastered by another node */
 	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
 	if (blocked) {
+		int mig;
 		if (mle->type == DLM_MLE_MASTER) {
 			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
 			BUG();
-		} else if (mle->type == DLM_MLE_MIGRATION) {
-			/* migration is in progress! */
-			/* the good news is that we now know the
-			 * "current" master (mle->master). */
-
+		}
+		mig = (mle->type == DLM_MLE_MIGRATION);
+		/* if there is a migration in progress, let the migration
+		 * finish before continuing.  we can wait for the absence
+		 * of the MIGRATION mle: either the migrate finished or
+		 * one of the nodes died and the mle was cleaned up.
+		 * if there is a BLOCK here, but it already has a master
+		 * set, we are too late.  the master does not have a ref
+		 * for us in the refmap.  detach the mle and drop it.
+		 * either way, go back to the top and start over. */
+		if (mig || mle->master != O2NM_MAX_NODES) {
+			BUG_ON(mig && mle->master == dlm->node_num);
+			/* we arrived too late.  the master does not
+			 * have a ref for us. retry. */
+			mlog(0, "%s:%.*s: late on %s\n",
+			     dlm->name, namelen, lockid,
+			     mig ?  "MIGRATION" : "BLOCK");
 			spin_unlock(&dlm->master_lock);
-			assert_spin_locked(&dlm->spinlock);
-
-			/* set the lockres owner and hash it */
-			spin_lock(&res->spinlock);
-			dlm_set_lockres_owner(dlm, res, mle->master);
-			__dlm_insert_lockres(dlm, res);
-			spin_unlock(&res->spinlock);
 			spin_unlock(&dlm->spinlock);
 
 			/* master is known, detach */
-			dlm_mle_detach_hb_events(dlm, mle);
+			if (!mig)
+				dlm_mle_detach_hb_events(dlm, mle);
 			dlm_put_mle(mle);
 			mle = NULL;
-			goto wake_waiters;
+			/* this is lame, but we cant wait on either
+			 * the mle or lockres waitqueue here */
+			if (mig)
+				msleep(100);
+			goto lookup;
 		}
 	} else {
 		/* go ahead and try to master lock on this node */
@@ -858,6 +932,13 @@
 
 	/* finally add the lockres to its hash bucket */
 	__dlm_insert_lockres(dlm, res);
+	/* since this lockres is new it doesnt not require the spinlock */
+	dlm_lockres_grab_inflight_ref_new(dlm, res);
+
+	/* if this node does not become the master make sure to drop
+	 * this inflight reference below */
+	drop_inflight_if_nonlocal = 1;
+
 	/* get an extra ref on the mle in case this is a BLOCK
 	 * if so, the creator of the BLOCK may try to put the last
 	 * ref at this time in the assert master handler, so we
@@ -910,7 +991,7 @@
 	ret = -EINVAL;
 	dlm_node_iter_init(mle->vote_map, &iter);
 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
-		ret = dlm_do_master_request(mle, nodenum);
+		ret = dlm_do_master_request(res, mle, nodenum);
 		if (ret < 0)
 			mlog_errno(ret);
 		if (mle->master != O2NM_MAX_NODES) {
@@ -960,6 +1041,8 @@
 
 wake_waiters:
 	spin_lock(&res->spinlock);
+	if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
+		dlm_lockres_drop_inflight_ref(dlm, res);
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 	spin_unlock(&res->spinlock);
 	wake_up(&res->wq);
@@ -998,7 +1081,7 @@
 		/* this will cause the master to re-assert across
 		 * the whole cluster, freeing up mles */
 		if (res->owner != dlm->node_num) {
-			ret = dlm_do_master_request(mle, res->owner);
+			ret = dlm_do_master_request(res, mle, res->owner);
 			if (ret < 0) {
 				/* give recovery a chance to run */
 				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
@@ -1062,6 +1145,8 @@
 			 	 * now tell other nodes that I am
 				 * mastering this. */
 				mle->master = dlm->node_num;
+				/* ref was grabbed in get_lock_resource
+				 * will be dropped in dlmlock_master */
 				assert = 1;
 				sleep = 0;
 			}
@@ -1087,7 +1172,8 @@
 					 (atomic_read(&mle->woken) == 1),
 					 timeo);
 		if (res->owner == O2NM_MAX_NODES) {
-			mlog(0, "waiting again\n");
+			mlog(0, "%s:%.*s: waiting again\n", dlm->name,
+			     res->lockname.len, res->lockname.name);
 			goto recheck;
 		}
 		mlog(0, "done waiting, master is %u\n", res->owner);
@@ -1100,8 +1186,7 @@
 		m = dlm->node_num;
 		mlog(0, "about to master %.*s here, this=%u\n",
 		     res->lockname.len, res->lockname.name, m);
-		ret = dlm_do_assert_master(dlm, res->lockname.name,
-					   res->lockname.len, mle->vote_map, 0);
+		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
 		if (ret) {
 			/* This is a failure in the network path,
 			 * not in the response to the assert_master
@@ -1117,6 +1202,8 @@
 
 	/* set the lockres owner */
 	spin_lock(&res->spinlock);
+	/* mastery reference obtained either during
+	 * assert_master_handler or in get_lock_resource */
 	dlm_change_lockres_owner(dlm, res, m);
 	spin_unlock(&res->spinlock);
 
@@ -1283,7 +1370,8 @@
  *
  */
 
-static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
+static int dlm_do_master_request(struct dlm_lock_resource *res,
+				 struct dlm_master_list_entry *mle, int to)
 {
 	struct dlm_ctxt *dlm = mle->dlm;
 	struct dlm_master_request request;
@@ -1339,6 +1427,9 @@
 		case DLM_MASTER_RESP_YES:
 			set_bit(to, mle->response_map);
 			mlog(0, "node %u is the master, response=YES\n", to);
+			mlog(0, "%s:%.*s: master node %u now knows I have a "
+			     "reference\n", dlm->name, res->lockname.len,
+			     res->lockname.name, to);
 			mle->master = to;
 			break;
 		case DLM_MASTER_RESP_NO:
@@ -1379,7 +1470,8 @@
  *
  * if possible, TRIM THIS DOWN!!!
  */
-int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
+			       void **ret_data)
 {
 	u8 response = DLM_MASTER_RESP_MAYBE;
 	struct dlm_ctxt *dlm = data;
@@ -1417,10 +1509,11 @@
 
 		/* take care of the easy cases up front */
 		spin_lock(&res->spinlock);
-		if (res->state & DLM_LOCK_RES_RECOVERING) {
+		if (res->state & (DLM_LOCK_RES_RECOVERING|
+				  DLM_LOCK_RES_MIGRATING)) {
 			spin_unlock(&res->spinlock);
 			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
-			     "being recovered\n");
+			     "being recovered/migrated\n");
 			response = DLM_MASTER_RESP_ERROR;
 			if (mle)
 				kmem_cache_free(dlm_mle_cache, mle);
@@ -1428,8 +1521,10 @@
 		}
 
 		if (res->owner == dlm->node_num) {
+			mlog(0, "%s:%.*s: setting bit %u in refmap\n",
+			     dlm->name, namelen, name, request->node_idx);
+			dlm_lockres_set_refmap_bit(request->node_idx, res);
 			spin_unlock(&res->spinlock);
-			// mlog(0, "this node is the master\n");
 			response = DLM_MASTER_RESP_YES;
 			if (mle)
 				kmem_cache_free(dlm_mle_cache, mle);
@@ -1477,7 +1572,6 @@
 			mlog(0, "node %u is master, but trying to migrate to "
 			     "node %u.\n", tmpmle->master, tmpmle->new_master);
 			if (tmpmle->master == dlm->node_num) {
-				response = DLM_MASTER_RESP_YES;
 				mlog(ML_ERROR, "no owner on lockres, but this "
 				     "node is trying to migrate it to %u?!\n",
 				     tmpmle->new_master);
@@ -1494,6 +1588,10 @@
 				 * go back and clean the mles on any
 				 * other nodes */
 				dispatch_assert = 1;
+				dlm_lockres_set_refmap_bit(request->node_idx, res);
+				mlog(0, "%s:%.*s: setting bit %u in refmap\n",
+				     dlm->name, namelen, name,
+				     request->node_idx);
 			} else
 				response = DLM_MASTER_RESP_NO;
 		} else {
@@ -1607,17 +1705,24 @@
  * can periodically run all locks owned by this node
  * and re-assert across the cluster...
  */
-static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
-				unsigned int namelen, void *nodemap,
-				u32 flags)
+int dlm_do_assert_master(struct dlm_ctxt *dlm,
+			 struct dlm_lock_resource *res,
+			 void *nodemap, u32 flags)
 {
 	struct dlm_assert_master assert;
 	int to, tmpret;
 	struct dlm_node_iter iter;
 	int ret = 0;
 	int reassert;
+	const char *lockname = res->lockname.name;
+	unsigned int namelen = res->lockname.len;
 
 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+	spin_lock(&res->spinlock);
+	res->state |= DLM_LOCK_RES_SETREF_INPROG;
+	spin_unlock(&res->spinlock);
+
 again:
 	reassert = 0;
 
@@ -1647,6 +1752,7 @@
 			mlog(0, "link to %d went down!\n", to);
 			/* any nonzero status return will do */
 			ret = tmpret;
+			r = 0;
 		} else if (r < 0) {
 			/* ok, something horribly messed.  kill thyself. */
 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
@@ -1661,17 +1767,39 @@
 			spin_unlock(&dlm->master_lock);
 			spin_unlock(&dlm->spinlock);
 			BUG();
-		} else if (r == EAGAIN) {
+		}
+
+		if (r & DLM_ASSERT_RESPONSE_REASSERT &&
+		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
+				mlog(ML_ERROR, "%.*s: very strange, "
+				     "master MLE but no lockres on %u\n",
+				     namelen, lockname, to);
+		}
+
+		if (r & DLM_ASSERT_RESPONSE_REASSERT) {
 			mlog(0, "%.*s: node %u create mles on other "
 			     "nodes and requests a re-assert\n", 
 			     namelen, lockname, to);
 			reassert = 1;
 		}
+		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
+			mlog(0, "%.*s: node %u has a reference to this "
+			     "lockres, set the bit in the refmap\n",
+			     namelen, lockname, to);
+			spin_lock(&res->spinlock);
+			dlm_lockres_set_refmap_bit(to, res);
+			spin_unlock(&res->spinlock);
+		}
 	}
 
 	if (reassert)
 		goto again;
 
+	spin_lock(&res->spinlock);
+	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+	spin_unlock(&res->spinlock);
+	wake_up(&res->wq);
+
 	return ret;
 }
 
@@ -1684,7 +1812,8 @@
  *
  * if possible, TRIM THIS DOWN!!!
  */
-int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
+			      void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_master_list_entry *mle = NULL;
@@ -1693,7 +1822,7 @@
 	char *name;
 	unsigned int namelen, hash;
 	u32 flags;
-	int master_request = 0;
+	int master_request = 0, have_lockres_ref = 0;
 	int ret = 0;
 
 	if (!dlm_grab(dlm))
@@ -1851,6 +1980,7 @@
 		spin_unlock(&mle->spinlock);
 
 		if (res) {
+			int wake = 0;
 			spin_lock(&res->spinlock);
 			if (mle->type == DLM_MLE_MIGRATION) {
 				mlog(0, "finishing off migration of lockres %.*s, "
@@ -1858,12 +1988,16 @@
 			       		res->lockname.len, res->lockname.name,
 			       		dlm->node_num, mle->new_master);
 				res->state &= ~DLM_LOCK_RES_MIGRATING;
+				wake = 1;
 				dlm_change_lockres_owner(dlm, res, mle->new_master);
 				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
 			} else {
 				dlm_change_lockres_owner(dlm, res, mle->master);
 			}
 			spin_unlock(&res->spinlock);
+			have_lockres_ref = 1;
+			if (wake)
+				wake_up(&res->wq);
 		}
 
 		/* master is known, detach if not already detached.
@@ -1913,12 +2047,28 @@
 
 done:
 	ret = 0;
-	if (res)
-		dlm_lockres_put(res);
+	if (res) {
+		spin_lock(&res->spinlock);
+		res->state |= DLM_LOCK_RES_SETREF_INPROG;
+		spin_unlock(&res->spinlock);
+		*ret_data = (void *)res;
+	}
 	dlm_put(dlm);
 	if (master_request) {
 		mlog(0, "need to tell master to reassert\n");
-		ret = EAGAIN;  // positive. negative would shoot down the node.
+		/* positive. negative would shoot down the node. */
+		ret |= DLM_ASSERT_RESPONSE_REASSERT;
+		if (!have_lockres_ref) {
+			mlog(ML_ERROR, "strange, got assert from %u, MASTER "
+			     "mle present here for %s:%.*s, but no lockres!\n",
+			     assert->node_idx, dlm->name, namelen, name);
+		}
+	}
+	if (have_lockres_ref) {
+		/* let the master know we have a reference to the lockres */
+		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
+		mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
+		     dlm->name, namelen, name, assert->node_idx);
 	}
 	return ret;
 
@@ -1929,11 +2079,25 @@
 	__dlm_print_one_lock_resource(res);
 	spin_unlock(&res->spinlock);
 	spin_unlock(&dlm->spinlock);
-	dlm_lockres_put(res);
+	*ret_data = (void *)res; 
 	dlm_put(dlm);
 	return -EINVAL;
 }
 
+void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
+{
+	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
+
+	if (ret_data) {
+		spin_lock(&res->spinlock);
+		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+		spin_unlock(&res->spinlock);
+		wake_up(&res->wq);
+		dlm_lockres_put(res);
+	}
+	return;
+}
+
 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
 			       struct dlm_lock_resource *res,
 			       int ignore_higher, u8 request_from, u32 flags)
@@ -2023,9 +2187,7 @@
 	 * even if one or more nodes die */
 	mlog(0, "worker about to master %.*s here, this=%u\n",
 		     res->lockname.len, res->lockname.name, dlm->node_num);
-	ret = dlm_do_assert_master(dlm, res->lockname.name,
-				   res->lockname.len,
-				   nodemap, flags);
+	ret = dlm_do_assert_master(dlm, res, nodemap, flags);
 	if (ret < 0) {
 		/* no need to restart, we are done */
 		if (!dlm_is_host_down(ret))
@@ -2097,14 +2259,180 @@
 	return ret;
 }
 
+/*
+ * DLM_DEREF_LOCKRES_MSG
+ */
+
+int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+{
+	struct dlm_deref_lockres deref;
+	int ret = 0, r;
+	const char *lockname;
+	unsigned int namelen;
+
+	lockname = res->lockname.name;
+	namelen = res->lockname.len;
+	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+	mlog(0, "%s:%.*s: sending deref to %d\n",
+	     dlm->name, namelen, lockname, res->owner);
+	memset(&deref, 0, sizeof(deref));
+	deref.node_idx = dlm->node_num;
+	deref.namelen = namelen;
+	memcpy(deref.name, lockname, namelen);
+
+	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
+				 &deref, sizeof(deref), res->owner, &r);
+	if (ret < 0)
+		mlog_errno(ret);
+	else if (r < 0) {
+		/* BAD.  other node says I did not have a ref. */
+		mlog(ML_ERROR,"while dropping ref on %s:%.*s "
+		    "(master=%u) got %d.\n", dlm->name, namelen,
+		    lockname, res->owner, r);
+		dlm_print_one_lock_resource(res);
+		BUG();
+	}
+	return ret;
+}
+
+int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+			      void **ret_data)
+{
+	struct dlm_ctxt *dlm = data;
+	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
+	struct dlm_lock_resource *res = NULL;
+	char *name;
+	unsigned int namelen;
+	int ret = -EINVAL;
+	u8 node;
+	unsigned int hash;
+	struct dlm_work_item *item;
+	int cleared = 0;
+	int dispatch = 0;
+
+	if (!dlm_grab(dlm))
+		return 0;
+
+	name = deref->name;
+	namelen = deref->namelen;
+	node = deref->node_idx;
+
+	if (namelen > DLM_LOCKID_NAME_MAX) {
+		mlog(ML_ERROR, "Invalid name length!");
+		goto done;
+	}
+	if (deref->node_idx >= O2NM_MAX_NODES) {
+		mlog(ML_ERROR, "Invalid node number: %u\n", node);
+		goto done;
+	}
+
+	hash = dlm_lockid_hash(name, namelen);
+
+	spin_lock(&dlm->spinlock);
+	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
+	if (!res) {
+		spin_unlock(&dlm->spinlock);
+		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
+		     dlm->name, namelen, name);
+		goto done;
+	}
+	spin_unlock(&dlm->spinlock);
+
+	spin_lock(&res->spinlock);
+	if (res->state & DLM_LOCK_RES_SETREF_INPROG)
+		dispatch = 1;
+	else {
+		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+		if (test_bit(node, res->refmap)) {
+			dlm_lockres_clear_refmap_bit(node, res);
+			cleared = 1;
+		}
+	}
+	spin_unlock(&res->spinlock);
+
+	if (!dispatch) {
+		if (cleared)
+			dlm_lockres_calc_usage(dlm, res);
+		else {
+			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+		     	"but it is already dropped!\n", dlm->name,
+		     	res->lockname.len, res->lockname.name, node);
+			__dlm_print_one_lock_resource(res);
+		}
+		ret = 0;
+		goto done;
+	}
+
+	item = kzalloc(sizeof(*item), GFP_NOFS);
+	if (!item) {
+		ret = -ENOMEM;
+		mlog_errno(ret);
+		goto done;
+	}
+
+	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
+	item->u.dl.deref_res = res;
+	item->u.dl.deref_node = node;
+
+	spin_lock(&dlm->work_lock);
+	list_add_tail(&item->list, &dlm->work_list);
+	spin_unlock(&dlm->work_lock);
+
+	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
+	return 0;
+
+done:
+	if (res)
+		dlm_lockres_put(res);
+	dlm_put(dlm);
+
+	return ret;
+}
+
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
+{
+	struct dlm_ctxt *dlm;
+	struct dlm_lock_resource *res;
+	u8 node;
+	u8 cleared = 0;
+
+	dlm = item->dlm;
+	res = item->u.dl.deref_res;
+	node = item->u.dl.deref_node;
+
+	spin_lock(&res->spinlock);
+	BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+	if (test_bit(node, res->refmap)) {
+		__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+		dlm_lockres_clear_refmap_bit(node, res);
+		cleared = 1;
+	}
+	spin_unlock(&res->spinlock);
+
+	if (cleared) {
+		mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
+		     dlm->name, res->lockname.len, res->lockname.name, node);
+		dlm_lockres_calc_usage(dlm, res);
+	} else {
+		mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+		     "but it is already dropped!\n", dlm->name,
+		     res->lockname.len, res->lockname.name, node);
+		__dlm_print_one_lock_resource(res);
+	}
+
+	dlm_lockres_put(res);
+}
+
 
 /*
  * DLM_MIGRATE_LOCKRES
  */
 
 
-int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
-			u8 target)
+static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
+			       struct dlm_lock_resource *res,
+			       u8 target)
 {
 	struct dlm_master_list_entry *mle = NULL;
 	struct dlm_master_list_entry *oldmle = NULL;
@@ -2116,7 +2444,7 @@
 	struct list_head *queue, *iter;
 	int i;
 	struct dlm_lock *lock;
-	int empty = 1;
+	int empty = 1, wake = 0;
 
 	if (!dlm_grab(dlm))
 		return -EINVAL;
@@ -2241,6 +2569,7 @@
 		     res->lockname.name, target);
 		spin_lock(&res->spinlock);
 		res->state &= ~DLM_LOCK_RES_MIGRATING;
+		wake = 1;
 		spin_unlock(&res->spinlock);
 		ret = -EINVAL;
 	}
@@ -2268,6 +2597,9 @@
 	 * the lockres
 	 */
 
+	/* now that remote nodes are spinning on the MIGRATING flag,
+	 * ensure that all assert_master work is flushed. */
+	flush_workqueue(dlm->dlm_worker);
 
 	/* get an extra reference on the mle.
 	 * otherwise the assert_master from the new
@@ -2296,6 +2628,7 @@
 		dlm_put_mle_inuse(mle);
 		spin_lock(&res->spinlock);
 		res->state &= ~DLM_LOCK_RES_MIGRATING;
+		wake = 1;
 		spin_unlock(&res->spinlock);
 		goto leave;
 	}
@@ -2322,7 +2655,8 @@
 			    res->owner == target)
 				break;
 
-			mlog(0, "timed out during migration\n");
+			mlog(0, "%s:%.*s: timed out during migration\n",
+			     dlm->name, res->lockname.len, res->lockname.name);
 			/* avoid hang during shutdown when migrating lockres 
 			 * to a node which also goes down */
 			if (dlm_is_node_dead(dlm, target)) {
@@ -2330,20 +2664,20 @@
 				     "target %u is no longer up, restarting\n",
 				     dlm->name, res->lockname.len,
 				     res->lockname.name, target);
-				ret = -ERESTARTSYS;
+				ret = -EINVAL;
+				/* migration failed, detach and clean up mle */
+				dlm_mle_detach_hb_events(dlm, mle);
+				dlm_put_mle(mle);
+				dlm_put_mle_inuse(mle);
+				spin_lock(&res->spinlock);
+				res->state &= ~DLM_LOCK_RES_MIGRATING;
+				wake = 1;
+				spin_unlock(&res->spinlock);
+				goto leave;
 			}
-		}
-		if (ret == -ERESTARTSYS) {
-			/* migration failed, detach and clean up mle */
-			dlm_mle_detach_hb_events(dlm, mle);
-			dlm_put_mle(mle);
-			dlm_put_mle_inuse(mle);
-			spin_lock(&res->spinlock);
-			res->state &= ~DLM_LOCK_RES_MIGRATING;
-			spin_unlock(&res->spinlock);
-			goto leave;
-		}
-		/* TODO: if node died: stop, clean up, return error */
+		} else
+			mlog(0, "%s:%.*s: caught signal during migration\n",
+			     dlm->name, res->lockname.len, res->lockname.name);
 	}
 
 	/* all done, set the owner, clear the flag */
@@ -2366,6 +2700,11 @@
 	if (ret < 0)
 		dlm_kick_thread(dlm, res);
 
+	/* wake up waiters if the MIGRATING flag got set
+	 * but migration failed */
+	if (wake)
+		wake_up(&res->wq);
+
 	/* TODO: cleanup */
 	if (mres)
 		free_page((unsigned long)mres);
@@ -2376,6 +2715,53 @@
 	return ret;
 }
 
+#define DLM_MIGRATION_RETRY_MS  100
+
+/* Should be called only after beginning the domain leave process.
+ * There should not be any remaining locks on nonlocal lock resources,
+ * and there should be no local locks left on locally mastered resources.
+ *
+ * Called with the dlm spinlock held, may drop it to do migration, but
+ * will re-acquire before exit.
+ *
+ * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
+int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+{
+	int ret;
+	int lock_dropped = 0;
+
+	if (res->owner != dlm->node_num) {
+		if (!__dlm_lockres_unused(res)) {
+			mlog(ML_ERROR, "%s:%.*s: this node is not master, "
+			     "trying to free this but locks remain\n",
+			     dlm->name, res->lockname.len, res->lockname.name);
+		}
+		goto leave;
+	}
+
+	/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
+	spin_unlock(&dlm->spinlock);
+	lock_dropped = 1;
+	while (1) {
+		ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
+		if (ret >= 0)
+			break;
+		if (ret == -ENOTEMPTY) {
+			mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
+		     		res->lockname.len, res->lockname.name);
+			BUG();
+		}
+
+		mlog(0, "lockres %.*s: migrate failed, "
+		     "retrying\n", res->lockname.len,
+		     res->lockname.name);
+		msleep(DLM_MIGRATION_RETRY_MS);
+	}
+	spin_lock(&dlm->spinlock);
+leave:
+	return lock_dropped;
+}
+
 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
 {
 	int ret;
@@ -2405,7 +2791,8 @@
 	return can_proceed;
 }
 
-int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
+				struct dlm_lock_resource *res)
 {
 	int ret;
 	spin_lock(&res->spinlock);
@@ -2434,8 +2821,15 @@
 	__dlm_lockres_reserve_ast(res);
 	spin_unlock(&res->spinlock);
 
-	/* now flush all the pending asts.. hang out for a bit */
+	/* now flush all the pending asts */
 	dlm_kick_thread(dlm, res);
+	/* before waiting on DIRTY, block processes which may
+	 * try to dirty the lockres before MIGRATING is set */
+	spin_lock(&res->spinlock);
+	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
+	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
+	spin_unlock(&res->spinlock);
+	/* now wait on any pending asts and the DIRTY state */
 	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
 	dlm_lockres_release_ast(dlm, res);
 
@@ -2461,6 +2855,13 @@
 		mlog(0, "trying again...\n");
 		goto again;
 	}
+	/* now that we are sure the MIGRATING state is there, drop
+	 * the unneded state which blocked threads trying to DIRTY */
+	spin_lock(&res->spinlock);
+	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
+	BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
+	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
+	spin_unlock(&res->spinlock);
 
 	/* did the target go down or die? */
 	spin_lock(&dlm->spinlock);
@@ -2490,7 +2891,7 @@
 {
 	struct list_head *iter, *iter2;
 	struct list_head *queue = &res->granted;
-	int i;
+	int i, bit;
 	struct dlm_lock *lock;
 
 	assert_spin_locked(&res->spinlock);
@@ -2508,12 +2909,28 @@
 				BUG_ON(!list_empty(&lock->bast_list));
 				BUG_ON(lock->ast_pending);
 				BUG_ON(lock->bast_pending);
+				dlm_lockres_clear_refmap_bit(lock->ml.node, res);
 				list_del_init(&lock->list);
 				dlm_lock_put(lock);
 			}
 		}
 		queue++;
 	}
+	bit = 0;
+	while (1) {
+		bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
+		if (bit >= O2NM_MAX_NODES)
+			break;
+		/* do not clear the local node reference, if there is a
+		 * process holding this, let it drop the ref itself */
+		if (bit != dlm->node_num) {
+			mlog(0, "%s:%.*s: node %u had a ref to this "
+			     "migrating lockres, clearing\n", dlm->name,
+			     res->lockname.len, res->lockname.name, bit);
+			dlm_lockres_clear_refmap_bit(bit, res);
+		}
+		bit++;
+	}
 }
 
 /* for now this is not too intelligent.  we will
@@ -2601,6 +3018,16 @@
 			mlog(0, "migrate request (node %u) returned %d!\n",
 			     nodenum, status);
 			ret = status;
+		} else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
+			/* during the migration request we short-circuited
+			 * the mastery of the lockres.  make sure we have
+			 * a mastery ref for nodenum */
+			mlog(0, "%s:%.*s: need ref for node %u\n",
+			     dlm->name, res->lockname.len, res->lockname.name,
+			     nodenum);
+			spin_lock(&res->spinlock);
+			dlm_lockres_set_refmap_bit(nodenum, res);
+			spin_unlock(&res->spinlock);
 		}
 	}
 
@@ -2619,7 +3046,8 @@
  * we will have no mle in the list to start with.  now we can add an mle for
  * the migration and this should be the only one found for those scanning the
  * list.  */
-int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
+				void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_lock_resource *res = NULL;
@@ -2745,7 +3173,13 @@
 			/* remove it from the list so that only one
 			 * mle will be found */
 			list_del_init(&tmp->list);
-			__dlm_mle_detach_hb_events(dlm, mle);
+			/* this was obviously WRONG.  mle is uninited here.  should be tmp. */
+			__dlm_mle_detach_hb_events(dlm, tmp);
+			ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
+			mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
+			    "telling master to get ref for cleared out mle "
+			    "during migration\n", dlm->name, namelen, name,
+			    master, new_master);
 		}
 		spin_unlock(&tmp->spinlock);
 	}
@@ -2753,6 +3187,8 @@
 	/* now add a migration mle to the tail of the list */
 	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
 	mle->new_master = new_master;
+	/* the new master will be sending an assert master for this.
+	 * at that point we will get the refmap reference */
 	mle->master = master;
 	/* do this for consistency with other mle types */
 	set_bit(new_master, mle->maybe_map);
@@ -2902,6 +3338,13 @@
 	clear_bit(dlm->node_num, iter.node_map);
 	spin_unlock(&dlm->spinlock);
 
+	/* ownership of the lockres is changing.  account for the
+	 * mastery reference here since old_master will briefly have
+	 * a reference after the migration completes */
+	spin_lock(&res->spinlock);
+	dlm_lockres_set_refmap_bit(old_master, res);
+	spin_unlock(&res->spinlock);
+
 	mlog(0, "now time to do a migrate request to other nodes\n");
 	ret = dlm_do_migrate_request(dlm, res, old_master,
 				     dlm->node_num, &iter);
@@ -2914,8 +3357,7 @@
 	     res->lockname.len, res->lockname.name);
 	/* this call now finishes out the nodemap
 	 * even if one or more nodes die */
-	ret = dlm_do_assert_master(dlm, res->lockname.name,
-				   res->lockname.len, iter.node_map,
+	ret = dlm_do_assert_master(dlm, res, iter.node_map,
 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
 	if (ret < 0) {
 		/* no longer need to retry.  all living nodes contacted. */
@@ -2927,8 +3369,7 @@
 	set_bit(old_master, iter.node_map);
 	mlog(0, "doing assert master of %.*s back to %u\n",
 	     res->lockname.len, res->lockname.name, old_master);
-	ret = dlm_do_assert_master(dlm, res->lockname.name,
-				   res->lockname.len, iter.node_map,
+	ret = dlm_do_assert_master(dlm, res, iter.node_map,
 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
 	if (ret < 0) {
 		mlog(0, "assert master to original master failed "
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 367a11e..6d4a83d 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -163,9 +163,6 @@
 	dlm_workfunc_t *workfunc;
 	int tot=0;
 
-	if (!dlm_joined(dlm))
-		return;
-
 	spin_lock(&dlm->work_lock);
 	list_splice_init(&dlm->work_list, &tmp_list);
 	spin_unlock(&dlm->work_lock);
@@ -821,7 +818,8 @@
 
 }
 
-int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,
+				  void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;
@@ -978,7 +976,8 @@
 }
 
 
-int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,
+			       void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;
@@ -1129,6 +1128,11 @@
 	if (total_locks == mres_total_locks)
 		mres->flags |= DLM_MRES_ALL_DONE;
 
+	mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
+	     dlm->name, res->lockname.len, res->lockname.name,
+	     orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
+	     send_to);
+
 	/* send it */
 	ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
 				 sz, send_to, &status);
@@ -1213,6 +1217,34 @@
 	return 0;
 }
 
+static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
+			       struct dlm_migratable_lockres *mres)
+{
+	struct dlm_lock dummy;
+	memset(&dummy, 0, sizeof(dummy));
+	dummy.ml.cookie = 0;
+	dummy.ml.type = LKM_IVMODE;
+	dummy.ml.convert_type = LKM_IVMODE;
+	dummy.ml.highest_blocked = LKM_IVMODE;
+	dummy.lksb = NULL;
+	dummy.ml.node = dlm->node_num;
+	dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
+}
+
+static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
+				    struct dlm_migratable_lock *ml,
+				    u8 *nodenum)
+{
+	if (unlikely(ml->cookie == 0 &&
+	    ml->type == LKM_IVMODE &&
+	    ml->convert_type == LKM_IVMODE &&
+	    ml->highest_blocked == LKM_IVMODE &&
+	    ml->list == DLM_BLOCKED_LIST)) {
+		*nodenum = ml->node;
+		return 1;
+	}
+	return 0;
+}
 
 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
 			 struct dlm_migratable_lockres *mres,
@@ -1260,6 +1292,14 @@
 				goto error;
 		}
 	}
+	if (total_locks == 0) {
+		/* send a dummy lock to indicate a mastery reference only */
+		mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
+		     dlm->name, res->lockname.len, res->lockname.name,
+		     send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
+		     "migration");
+		dlm_add_dummy_lock(dlm, mres);
+	}
 	/* flush any remaining locks */
 	ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
 	if (ret < 0)
@@ -1293,7 +1333,8 @@
  * do we spin?  returning an error only delays the problem really
  */
 
-int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+			    void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_migratable_lockres *mres =
@@ -1382,17 +1423,21 @@
 		spin_lock(&res->spinlock);
 		res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 		spin_unlock(&res->spinlock);
+		wake_up(&res->wq);
 
 		/* add an extra ref for just-allocated lockres 
 		 * otherwise the lockres will be purged immediately */
 		dlm_lockres_get(res);
-
 	}
 
 	/* at this point we have allocated everything we need,
 	 * and we have a hashed lockres with an extra ref and
 	 * the proper res->state flags. */
 	ret = 0;
+	spin_lock(&res->spinlock);
+	/* drop this either when master requery finds a different master
+	 * or when a lock is added by the recovery worker */
+	dlm_lockres_grab_inflight_ref(dlm, res);
 	if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
 		/* migration cannot have an unknown master */
 		BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1445,11 @@
 			  "unknown owner.. will need to requery: "
 			  "%.*s\n", mres->lockname_len, mres->lockname);
 	} else {
-		spin_lock(&res->spinlock);
+		/* take a reference now to pin the lockres, drop it
+		 * when locks are added in the worker */
 		dlm_change_lockres_owner(dlm, res, dlm->node_num);
-		spin_unlock(&res->spinlock);
 	}
+	spin_unlock(&res->spinlock);
 
 	/* queue up work for dlm_mig_lockres_worker */
 	dlm_grab(dlm);  /* get an extra ref for the work item */
@@ -1459,6 +1505,9 @@
 				   "this node will take it.\n",
 				   res->lockname.len, res->lockname.name);
 		} else {
+			spin_lock(&res->spinlock);
+			dlm_lockres_drop_inflight_ref(dlm, res);
+			spin_unlock(&res->spinlock);
 			mlog(0, "master needs to respond to sender "
 				  "that node %u still owns %.*s\n",
 				  real_master, res->lockname.len,
@@ -1578,7 +1627,8 @@
 /* this function cannot error, so unless the sending
  * or receiving of the message failed, the owner can
  * be trusted */
-int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
+			       void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
@@ -1660,21 +1710,38 @@
 {
 	struct dlm_migratable_lock *ml;
 	struct list_head *queue;
+	struct list_head *tmpq = NULL;
 	struct dlm_lock *newlock = NULL;
 	struct dlm_lockstatus *lksb = NULL;
 	int ret = 0;
-	int i, bad;
+	int i, j, bad;
 	struct list_head *iter;
 	struct dlm_lock *lock = NULL;
+	u8 from = O2NM_MAX_NODES;
+	unsigned int added = 0;
 
 	mlog(0, "running %d locks for this lockres\n", mres->num_locks);
 	for (i=0; i<mres->num_locks; i++) {
 		ml = &(mres->ml[i]);
+
+		if (dlm_is_dummy_lock(dlm, ml, &from)) {
+			/* placeholder, just need to set the refmap bit */
+			BUG_ON(mres->num_locks != 1);
+			mlog(0, "%s:%.*s: dummy lock for %u\n",
+			     dlm->name, mres->lockname_len, mres->lockname,
+			     from);
+			spin_lock(&res->spinlock);
+			dlm_lockres_set_refmap_bit(from, res);
+			spin_unlock(&res->spinlock);
+			added++;
+			break;
+		}
 		BUG_ON(ml->highest_blocked != LKM_IVMODE);
 		newlock = NULL;
 		lksb = NULL;
 
 		queue = dlm_list_num_to_pointer(res, ml->list);
+		tmpq = NULL;
 
 		/* if the lock is for the local node it needs to
 		 * be moved to the proper location within the queue.
@@ -1684,11 +1751,16 @@
 			BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
 
 			spin_lock(&res->spinlock);
-			list_for_each(iter, queue) {
-				lock = list_entry (iter, struct dlm_lock, list);
-				if (lock->ml.cookie != ml->cookie)
-					lock = NULL;
-				else
+			for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
+				tmpq = dlm_list_idx_to_ptr(res, j);
+				list_for_each(iter, tmpq) {
+					lock = list_entry (iter, struct dlm_lock, list);
+					if (lock->ml.cookie != ml->cookie)
+						lock = NULL;
+					else
+						break;
+				}
+				if (lock)
 					break;
 			}
 
@@ -1698,12 +1770,20 @@
 				u64 c = ml->cookie;
 				mlog(ML_ERROR, "could not find local lock "
 					       "with cookie %u:%llu!\n",
-					       dlm_get_lock_cookie_node(c),
-					       dlm_get_lock_cookie_seq(c));
+				     dlm_get_lock_cookie_node(be64_to_cpu(c)),
+				     dlm_get_lock_cookie_seq(be64_to_cpu(c)));
+				__dlm_print_one_lock_resource(res);
 				BUG();
 			}
 			BUG_ON(lock->ml.node != ml->node);
 
+			if (tmpq != queue) {
+				mlog(0, "lock was on %u instead of %u for %.*s\n",
+				     j, ml->list, res->lockname.len, res->lockname.name);
+				spin_unlock(&res->spinlock);
+				continue;
+			}
+
 			/* see NOTE above about why we do not update
 			 * to match the master here */
 
@@ -1711,6 +1791,7 @@
 			/* do not alter lock refcount.  switching lists. */
 			list_move_tail(&lock->list, queue);
 			spin_unlock(&res->spinlock);
+			added++;
 
 			mlog(0, "just reordered a local lock!\n");
 			continue;
@@ -1799,14 +1880,14 @@
 				mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
 				     "exists on this lockres!\n", dlm->name,
 				     res->lockname.len, res->lockname.name,
-				     dlm_get_lock_cookie_node(c),
-				     dlm_get_lock_cookie_seq(c));
+				     dlm_get_lock_cookie_node(be64_to_cpu(c)),
+				     dlm_get_lock_cookie_seq(be64_to_cpu(c)));
 
 				mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
 				     "node=%u, cookie=%u:%llu, queue=%d\n",
 	      			     ml->type, ml->convert_type, ml->node,
-				     dlm_get_lock_cookie_node(ml->cookie),
-				     dlm_get_lock_cookie_seq(ml->cookie),
+				     dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),
+				     dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),
 				     ml->list);
 
 				__dlm_print_one_lock_resource(res);
@@ -1817,12 +1898,22 @@
 		if (!bad) {
 			dlm_lock_get(newlock);
 			list_add_tail(&newlock->list, queue);
+			mlog(0, "%s:%.*s: added lock for node %u, "
+			     "setting refmap bit\n", dlm->name,
+			     res->lockname.len, res->lockname.name, ml->node);
+			dlm_lockres_set_refmap_bit(ml->node, res);
+			added++;
 		}
 		spin_unlock(&res->spinlock);
 	}
 	mlog(0, "done running all the locks\n");
 
 leave:
+	/* balance the ref taken when the work was queued */
+	spin_lock(&res->spinlock);
+	dlm_lockres_drop_inflight_ref(dlm, res);
+	spin_unlock(&res->spinlock);
+
 	if (ret < 0) {
 		mlog_errno(ret);
 		if (newlock)
@@ -1935,9 +2026,11 @@
 		if (res->owner == dead_node) {
 			list_del_init(&res->recovering);
 			spin_lock(&res->spinlock);
+			/* new_master has our reference from
+			 * the lock state sent during recovery */
 			dlm_change_lockres_owner(dlm, res, new_master);
 			res->state &= ~DLM_LOCK_RES_RECOVERING;
-			if (!__dlm_lockres_unused(res))
+			if (__dlm_lockres_has_locks(res))
 				__dlm_dirty_lockres(dlm, res);
 			spin_unlock(&res->spinlock);
 			wake_up(&res->wq);
@@ -1977,9 +2070,11 @@
 					dlm_lockres_put(res);
 				}
 				spin_lock(&res->spinlock);
+				/* new_master has our reference from
+				 * the lock state sent during recovery */
 				dlm_change_lockres_owner(dlm, res, new_master);
 				res->state &= ~DLM_LOCK_RES_RECOVERING;
-				if (!__dlm_lockres_unused(res))
+				if (__dlm_lockres_has_locks(res))
 					__dlm_dirty_lockres(dlm, res);
 				spin_unlock(&res->spinlock);
 				wake_up(&res->wq);
@@ -2048,6 +2143,7 @@
 {
 	struct list_head *iter, *tmpiter;
 	struct dlm_lock *lock;
+	unsigned int freed = 0;
 
 	/* this node is the lockres master:
 	 * 1) remove any stale locks for the dead node
@@ -2062,6 +2158,7 @@
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
 			dlm_lock_put(lock);
+			freed++;
 		}
 	}
 	list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2069,6 +2166,7 @@
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
 			dlm_lock_put(lock);
+			freed++;
 		}
 	}
 	list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2076,9 +2174,23 @@
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
 			dlm_lock_put(lock);
+			freed++;
 		}
 	}
 
+	if (freed) {
+		mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
+		     "dropping ref from lockres\n", dlm->name,
+		     res->lockname.len, res->lockname.name, freed, dead_node);
+		BUG_ON(!test_bit(dead_node, res->refmap));
+		dlm_lockres_clear_refmap_bit(dead_node, res);
+	} else if (test_bit(dead_node, res->refmap)) {
+		mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+		     "no locks and had not purged before dying\n", dlm->name,
+		     res->lockname.len, res->lockname.name, dead_node);
+		dlm_lockres_clear_refmap_bit(dead_node, res);
+	}
+
 	/* do not kick thread yet */
 	__dlm_dirty_lockres(dlm, res);
 }
@@ -2141,9 +2253,21 @@
 			spin_lock(&res->spinlock);
 			/* zero the lvb if necessary */
 			dlm_revalidate_lvb(dlm, res, dead_node);
-			if (res->owner == dead_node)
+			if (res->owner == dead_node) {
+				if (res->state & DLM_LOCK_RES_DROPPING_REF)
+					mlog(0, "%s:%.*s: owned by "
+					     "dead node %u, this node was "
+					     "dropping its ref when it died. "
+					     "continue, dropping the flag.\n",
+					     dlm->name, res->lockname.len,
+					     res->lockname.name, dead_node);
+
+				/* the wake_up for this will happen when the
+				 * RECOVERING flag is dropped later */
+				res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+
 				dlm_move_lockres_to_recovery_list(dlm, res);
-			else if (res->owner == dlm->node_num) {
+			} else if (res->owner == dlm->node_num) {
 				dlm_free_dead_locks(dlm, res, dead_node);
 				__dlm_lockres_calc_usage(dlm, res);
 			}
@@ -2480,7 +2604,8 @@
 	return ret;
 }
 
-int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+			   void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_begin_reco *br = (struct dlm_begin_reco *)msg->buf;
@@ -2608,7 +2733,8 @@
 	return ret;
 }
 
-int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
+			      void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index 0c822f3..8ffa091 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -54,9 +54,6 @@
 #include "cluster/masklog.h"
 
 static int dlm_thread(void *data);
-static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
-				  struct dlm_lock_resource *lockres);
-
 static void dlm_flush_asts(struct dlm_ctxt *dlm);
 
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
@@ -82,14 +79,33 @@
 	current->state = TASK_RUNNING;
 }
 
-
-int __dlm_lockres_unused(struct dlm_lock_resource *res)
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
 {
 	if (list_empty(&res->granted) &&
 	    list_empty(&res->converting) &&
-	    list_empty(&res->blocked) &&
-	    list_empty(&res->dirty))
-		return 1;
+	    list_empty(&res->blocked))
+		return 0;
+	return 1;
+}
+
+/* "unused": the lockres has no locks, is not on the dirty list,
+ * has no inflight locks (in the gap between mastery and acquiring
+ * the first lock), and has no bits in its refmap.
+ * truly ready to be freed. */
+int __dlm_lockres_unused(struct dlm_lock_resource *res)
+{
+	if (!__dlm_lockres_has_locks(res) &&
+	    (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
+		/* try not to scan the bitmap unless the first two
+		 * conditions are already true */
+		int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
+		if (bit >= O2NM_MAX_NODES) {
+			/* since the bit for dlm->node_num is not
+			 * set, inflight_locks better be zero */
+			BUG_ON(res->inflight_locks != 0);
+			return 1;
+		}
+	}
 	return 0;
 }
 
@@ -106,46 +122,21 @@
 	assert_spin_locked(&res->spinlock);
 
 	if (__dlm_lockres_unused(res)){
-		/* For now, just keep any resource we master */
-		if (res->owner == dlm->node_num)
-		{
-			if (!list_empty(&res->purge)) {
-				mlog(0, "we master %s:%.*s, but it is on "
-				     "the purge list.  Removing\n",
-				     dlm->name, res->lockname.len,
-				     res->lockname.name);
-				list_del_init(&res->purge);
-				dlm->purge_count--;
-			}
-			return;
-		}
-
 		if (list_empty(&res->purge)) {
-			mlog(0, "putting lockres %.*s from purge list\n",
-			     res->lockname.len, res->lockname.name);
+			mlog(0, "putting lockres %.*s:%p onto purge list\n",
+			     res->lockname.len, res->lockname.name, res);
 
 			res->last_used = jiffies;
+			dlm_lockres_get(res);
 			list_add_tail(&res->purge, &dlm->purge_list);
 			dlm->purge_count++;
-
-			/* if this node is not the owner, there is
-			 * no way to keep track of who the owner could be.
-			 * unhash it to avoid serious problems. */
-			if (res->owner != dlm->node_num) {
-				mlog(0, "%s:%.*s: doing immediate "
-				     "purge of lockres owned by %u\n",
-				     dlm->name, res->lockname.len,
-				     res->lockname.name, res->owner);
-
-				dlm_purge_lockres_now(dlm, res);
-			}
 		}
 	} else if (!list_empty(&res->purge)) {
-		mlog(0, "removing lockres %.*s from purge list, "
-		     "owner=%u\n", res->lockname.len, res->lockname.name,
-		     res->owner);
+		mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
+		     res->lockname.len, res->lockname.name, res, res->owner);
 
 		list_del_init(&res->purge);
+		dlm_lockres_put(res);
 		dlm->purge_count--;
 	}
 }
@@ -163,68 +154,65 @@
 	spin_unlock(&dlm->spinlock);
 }
 
-/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
- * to do migration, but will re-acquire before exit. */
-void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
+static int dlm_purge_lockres(struct dlm_ctxt *dlm,
+			     struct dlm_lock_resource *res)
 {
 	int master;
-	int ret;
+	int ret = 0;
 
-	spin_lock(&lockres->spinlock);
-	master = lockres->owner == dlm->node_num;
-	spin_unlock(&lockres->spinlock);
-
-	mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
-	     lockres->lockname.name, master);
-
-	/* Non master is the easy case -- no migration required, just
-	 * quit. */
+	spin_lock(&res->spinlock);
+	if (!__dlm_lockres_unused(res)) {
+		spin_unlock(&res->spinlock);
+		mlog(0, "%s:%.*s: tried to purge but not unused\n",
+		     dlm->name, res->lockname.len, res->lockname.name);
+		return -ENOTEMPTY;
+	}
+	master = (res->owner == dlm->node_num);
 	if (!master)
-		goto finish;
+		res->state |= DLM_LOCK_RES_DROPPING_REF;
+	spin_unlock(&res->spinlock);
 
-	/* Wheee! Migrate lockres here! */
-	spin_unlock(&dlm->spinlock);
-again:
+	mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
+	     res->lockname.name, master);
 
-	ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
-	if (ret == -ENOTEMPTY) {
-		mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
-		     lockres->lockname.len, lockres->lockname.name);
-
-		BUG();
-	} else if (ret < 0) {
-		mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
-		     lockres->lockname.len, lockres->lockname.name);
-		msleep(100);
-		goto again;
+	if (!master) {
+		spin_lock(&res->spinlock);
+		/* This ensures that clear refmap is sent after the set */
+		__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+		spin_unlock(&res->spinlock);
+		/* drop spinlock to do messaging, retake below */
+		spin_unlock(&dlm->spinlock);
+		/* clear our bit from the master's refmap, ignore errors */
+		ret = dlm_drop_lockres_ref(dlm, res);
+		if (ret < 0) {
+			mlog_errno(ret);
+			if (!dlm_is_host_down(ret))
+				BUG();
+		}
+		mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
+		     dlm->name, res->lockname.len, res->lockname.name, ret);
+		spin_lock(&dlm->spinlock);
 	}
 
-	spin_lock(&dlm->spinlock);
-
-finish:
-	if (!list_empty(&lockres->purge)) {
-		list_del_init(&lockres->purge);
+	if (!list_empty(&res->purge)) {
+		mlog(0, "removing lockres %.*s:%p from purgelist, "
+		     "master = %d\n", res->lockname.len, res->lockname.name,
+		     res, master);
+		list_del_init(&res->purge);
+		dlm_lockres_put(res);
 		dlm->purge_count--;
 	}
-	__dlm_unhash_lockres(lockres);
-}
+	__dlm_unhash_lockres(res);
 
-/* make an unused lockres go away immediately.
- * as soon as the dlm spinlock is dropped, this lockres
- * will not be found. kfree still happens on last put. */
-static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
-				  struct dlm_lock_resource *lockres)
-{
-	assert_spin_locked(&dlm->spinlock);
-	assert_spin_locked(&lockres->spinlock);
-
-	BUG_ON(!__dlm_lockres_unused(lockres));
-
-	if (!list_empty(&lockres->purge)) {
-		list_del_init(&lockres->purge);
-		dlm->purge_count--;
+	/* lockres is not in the hash now.  drop the flag and wake up
+	 * any processes waiting in dlm_get_lock_resource. */
+	if (!master) {
+		spin_lock(&res->spinlock);
+		res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+		spin_unlock(&res->spinlock);
+		wake_up(&res->wq);
 	}
-	__dlm_unhash_lockres(lockres);
+	return 0;
 }
 
 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
@@ -268,13 +256,17 @@
 			break;
 		}
 
+		mlog(0, "removing lockres %.*s:%p from purgelist\n",
+		     lockres->lockname.len, lockres->lockname.name, lockres);
 		list_del_init(&lockres->purge);
+		dlm_lockres_put(lockres);
 		dlm->purge_count--;
 
 		/* This may drop and reacquire the dlm spinlock if it
 		 * has to do migration. */
 		mlog(0, "calling dlm_purge_lockres!\n");
-		dlm_purge_lockres(dlm, lockres);
+		if (dlm_purge_lockres(dlm, lockres))
+			BUG();
 		mlog(0, "DONE calling dlm_purge_lockres!\n");
 
 		/* Avoid adding any scheduling latencies */
@@ -467,12 +459,17 @@
 	assert_spin_locked(&res->spinlock);
 
 	/* don't shuffle secondary queues */
-	if ((res->owner == dlm->node_num) &&
-	    !(res->state & DLM_LOCK_RES_DIRTY)) {
-		/* ref for dirty_list */
-		dlm_lockres_get(res);
-		list_add_tail(&res->dirty, &dlm->dirty_list);
-		res->state |= DLM_LOCK_RES_DIRTY;
+	if ((res->owner == dlm->node_num)) {
+		if (res->state & (DLM_LOCK_RES_MIGRATING |
+				  DLM_LOCK_RES_BLOCK_DIRTY))
+		    return;
+
+		if (list_empty(&res->dirty)) {
+			/* ref for dirty_list */
+			dlm_lockres_get(res);
+			list_add_tail(&res->dirty, &dlm->dirty_list);
+			res->state |= DLM_LOCK_RES_DIRTY;
+		}
 	}
 }
 
@@ -651,7 +648,7 @@
 			dlm_lockres_get(res);
 
 			spin_lock(&res->spinlock);
-			res->state &= ~DLM_LOCK_RES_DIRTY;
+			/* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
 			list_del_init(&res->dirty);
 			spin_unlock(&res->spinlock);
 			spin_unlock(&dlm->spinlock);
@@ -675,10 +672,11 @@
 			/* it is now ok to move lockreses in these states
 			 * to the dirty list, assuming that they will only be
 			 * dirty for a short while. */
+			BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 			if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
-					  DLM_LOCK_RES_MIGRATING |
 					  DLM_LOCK_RES_RECOVERING)) {
 				/* move it to the tail and keep going */
+				res->state &= ~DLM_LOCK_RES_DIRTY;
 				spin_unlock(&res->spinlock);
 				mlog(0, "delaying list shuffling for in-"
 				     "progress lockres %.*s, state=%d\n",
@@ -699,6 +697,7 @@
 
 			/* called while holding lockres lock */
 			dlm_shuffle_lists(dlm, res);
+			res->state &= ~DLM_LOCK_RES_DIRTY;
 			spin_unlock(&res->spinlock);
 
 			dlm_lockres_calc_usage(dlm, res);
@@ -709,11 +708,8 @@
 			/* if the lock was in-progress, stick
 			 * it on the back of the list */
 			if (delay) {
-				/* ref for dirty_list */
-				dlm_lockres_get(res);
 				spin_lock(&res->spinlock);
-				list_add_tail(&res->dirty, &dlm->dirty_list);
-				res->state |= DLM_LOCK_RES_DIRTY;
+				__dlm_dirty_lockres(dlm, res);
 				spin_unlock(&res->spinlock);
 			}
 			dlm_lockres_put(res);
diff --git a/fs/ocfs2/dlm/dlmunlock.c b/fs/ocfs2/dlm/dlmunlock.c
index 37be4b2..86ca085 100644
--- a/fs/ocfs2/dlm/dlmunlock.c
+++ b/fs/ocfs2/dlm/dlmunlock.c
@@ -147,6 +147,10 @@
 		goto leave;
 	}
 
+	if (res->state & DLM_LOCK_RES_MIGRATING) {
+		status = DLM_MIGRATING;
+		goto leave;
+	}
 
 	/* see above for what the spec says about
 	 * LKM_CANCEL and the lock queue state */
@@ -244,8 +248,8 @@
 		/* this should always be coupled with list removal */
 		BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK));
 		mlog(0, "lock %u:%llu should be gone now! refs=%d\n",
-		     dlm_get_lock_cookie_node(lock->ml.cookie),
-		     dlm_get_lock_cookie_seq(lock->ml.cookie),
+		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
 		     atomic_read(&lock->lock_refs.refcount)-1);
 		dlm_lock_put(lock);
 	}
@@ -379,7 +383,8 @@
  * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID,
  *          return value from dlmunlock_master
  */
-int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
+			    void **ret_data)
 {
 	struct dlm_ctxt *dlm = data;
 	struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf;
@@ -502,8 +507,8 @@
 	if (!found)
 		mlog(ML_ERROR, "failed to find lock to unlock! "
 			       "cookie=%u:%llu\n",
-			       dlm_get_lock_cookie_node(unlock->cookie),
-			       dlm_get_lock_cookie_seq(unlock->cookie));
+		     dlm_get_lock_cookie_node(be64_to_cpu(unlock->cookie)),
+		     dlm_get_lock_cookie_seq(be64_to_cpu(unlock->cookie)));
 	else
 		dlm_lock_put(lock);
 
diff --git a/fs/ocfs2/vote.c b/fs/ocfs2/vote.c
index 0afd8b9..f30e63b 100644
--- a/fs/ocfs2/vote.c
+++ b/fs/ocfs2/vote.c
@@ -887,7 +887,7 @@
 
 static int ocfs2_handle_response_message(struct o2net_msg *msg,
 					 u32 len,
-					 void *data)
+					 void *data, void **ret_data)
 {
 	unsigned int response_id, node_num;
 	int response_status;
@@ -943,7 +943,7 @@
 
 static int ocfs2_handle_vote_message(struct o2net_msg *msg,
 				     u32 len,
-				     void *data)
+				     void *data, void **ret_data)
 {
 	int status;
 	struct ocfs2_super *osb = data;
@@ -1007,7 +1007,7 @@
 					osb->net_key,
 					sizeof(struct ocfs2_response_msg),
 					ocfs2_handle_response_message,
-					osb, &osb->osb_net_handlers);
+					osb, NULL, &osb->osb_net_handlers);
 	if (status) {
 		mlog_errno(status);
 		goto bail;
@@ -1017,7 +1017,7 @@
 					osb->net_key,
 					sizeof(struct ocfs2_vote_msg),
 					ocfs2_handle_vote_message,
-					osb, &osb->osb_net_handlers);
+					osb, NULL, &osb->osb_net_handlers);
 	if (status) {
 		mlog_errno(status);
 		goto bail;