Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph fixes from Sage Weil:
 "There is a pair of fixes for double-frees in the recent bundle for
  3.10, a couple of fixes for long-standing bugs (sleep while atomic and
  an endianness fix), and a locking fix that can be triggered when osds
  are going down"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  rbd: fix cleanup in rbd_add()
  rbd: don't destroy ceph_opts in rbd_add()
  ceph: ceph_pagelist_append might sleep while atomic
  ceph: add cpu_to_le32() calls when encoding a reconnect capability
  libceph: must hold mutex for reset_changed_osds()
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index d6d3140..3063452 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -519,8 +519,8 @@
 };
 
 /*
- * Initialize an rbd client instance.
- * We own *ceph_opts.
+ * Initialize an rbd client instance.  Success or not, this function
+ * consumes ceph_opts.
  */
 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 {
@@ -675,7 +675,8 @@
 
 /*
  * Get a ceph client with specific addr and configuration, if one does
- * not exist create it.
+ * not exist create it.  Either way, ceph_opts is consumed by this
+ * function.
  */
 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 {
@@ -4697,8 +4698,10 @@
 	return ret;
 }
 
-/* Undo whatever state changes are made by v1 or v2 image probe */
-
+/*
+ * Undo whatever state changes are made by v1 or v2 header info
+ * call.
+ */
 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 {
 	struct rbd_image_header	*header;
@@ -4902,9 +4905,10 @@
 	int tmp;
 
 	/*
-	 * Get the id from the image id object.  If it's not a
-	 * format 2 image, we'll get ENOENT back, and we'll assume
-	 * it's a format 1 image.
+	 * Get the id from the image id object.  Unless there's an
+	 * error, rbd_dev->spec->image_id will be filled in with
+	 * a dynamically-allocated string, and rbd_dev->image_format
+	 * will be set to either 1 or 2.
 	 */
 	ret = rbd_dev_image_id(rbd_dev);
 	if (ret)
@@ -4992,7 +4996,6 @@
 		rc = PTR_ERR(rbdc);
 		goto err_out_args;
 	}
-	ceph_opts = NULL;	/* rbd_dev client now owns this */
 
 	/* pick the pool */
 	osdc = &rbdc->client->osdc;
@@ -5027,18 +5030,18 @@
 	rbd_dev->mapping.read_only = read_only;
 
 	rc = rbd_dev_device_setup(rbd_dev);
-	if (!rc)
-		return count;
+	if (rc) {
+		rbd_dev_image_release(rbd_dev);
+		goto err_out_module;
+	}
 
-	rbd_dev_image_release(rbd_dev);
+	return count;
+
 err_out_rbd_dev:
 	rbd_dev_destroy(rbd_dev);
 err_out_client:
 	rbd_put_client(rbdc);
 err_out_args:
-	if (ceph_opts)
-		ceph_destroy_options(ceph_opts);
-	kfree(rbd_opts);
 	rbd_spec_put(spec);
 err_out_module:
 	module_put(THIS_MODULE);
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index 202dd3d6..ebbf680 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -191,27 +191,23 @@
 }
 
 /**
- * Encode the flock and fcntl locks for the given inode into the pagelist.
- * Format is: #fcntl locks, sequential fcntl locks, #flock locks,
- * sequential flock locks.
- * Must be called with lock_flocks() already held.
- * If we encounter more of a specific lock type than expected,
- * we return the value 1.
+ * Encode the flock and fcntl locks for the given inode into the ceph_filelock
+ * array. Must be called with lock_flocks() already held.
+ * If we encounter more of a specific lock type than expected, return -ENOSPC.
  */
-int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist,
-		      int num_fcntl_locks, int num_flock_locks)
+int ceph_encode_locks_to_buffer(struct inode *inode,
+				struct ceph_filelock *flocks,
+				int num_fcntl_locks, int num_flock_locks)
 {
 	struct file_lock *lock;
-	struct ceph_filelock cephlock;
 	int err = 0;
 	int seen_fcntl = 0;
 	int seen_flock = 0;
+	int l = 0;
 
 	dout("encoding %d flock and %d fcntl locks", num_flock_locks,
 	     num_fcntl_locks);
-	err = ceph_pagelist_append(pagelist, &num_fcntl_locks, sizeof(u32));
-	if (err)
-		goto fail;
+
 	for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
 		if (lock->fl_flags & FL_POSIX) {
 			++seen_fcntl;
@@ -219,19 +215,12 @@
 				err = -ENOSPC;
 				goto fail;
 			}
-			err = lock_to_ceph_filelock(lock, &cephlock);
+			err = lock_to_ceph_filelock(lock, &flocks[l]);
 			if (err)
 				goto fail;
-			err = ceph_pagelist_append(pagelist, &cephlock,
-					   sizeof(struct ceph_filelock));
+			++l;
 		}
-		if (err)
-			goto fail;
 	}
-
-	err = ceph_pagelist_append(pagelist, &num_flock_locks, sizeof(u32));
-	if (err)
-		goto fail;
 	for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
 		if (lock->fl_flags & FL_FLOCK) {
 			++seen_flock;
@@ -239,19 +228,51 @@
 				err = -ENOSPC;
 				goto fail;
 			}
-			err = lock_to_ceph_filelock(lock, &cephlock);
+			err = lock_to_ceph_filelock(lock, &flocks[l]);
 			if (err)
 				goto fail;
-			err = ceph_pagelist_append(pagelist, &cephlock,
-					   sizeof(struct ceph_filelock));
+			++l;
 		}
-		if (err)
-			goto fail;
 	}
 fail:
 	return err;
 }
 
+/**
+ * Copy the encoded flock and fcntl locks into the pagelist.
+ * Format is: #fcntl locks, sequential fcntl locks, #flock locks,
+ * sequential flock locks.
+ * Returns zero on success.
+ */
+int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
+			   struct ceph_pagelist *pagelist,
+			   int num_fcntl_locks, int num_flock_locks)
+{
+	int err = 0;
+	__le32 nlocks;
+
+	nlocks = cpu_to_le32(num_fcntl_locks);
+	err = ceph_pagelist_append(pagelist, &nlocks, sizeof(nlocks));
+	if (err)
+		goto out_fail;
+
+	err = ceph_pagelist_append(pagelist, flocks,
+				   num_fcntl_locks * sizeof(*flocks));
+	if (err)
+		goto out_fail;
+
+	nlocks = cpu_to_le32(num_flock_locks);
+	err = ceph_pagelist_append(pagelist, &nlocks, sizeof(nlocks));
+	if (err)
+		goto out_fail;
+
+	err = ceph_pagelist_append(pagelist,
+				   &flocks[num_fcntl_locks],
+				   num_flock_locks * sizeof(*flocks));
+out_fail:
+	return err;
+}
+
 /*
  * Given a pointer to a lock, convert it to a ceph filelock
  */
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 4f22671..4d29203 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2478,39 +2478,44 @@
 
 	if (recon_state->flock) {
 		int num_fcntl_locks, num_flock_locks;
-		struct ceph_pagelist_cursor trunc_point;
+		struct ceph_filelock *flocks;
 
-		ceph_pagelist_set_cursor(pagelist, &trunc_point);
-		do {
-			lock_flocks();
-			ceph_count_locks(inode, &num_fcntl_locks,
-					 &num_flock_locks);
-			rec.v2.flock_len = (2*sizeof(u32) +
-					    (num_fcntl_locks+num_flock_locks) *
-					    sizeof(struct ceph_filelock));
-			unlock_flocks();
-
-			/* pre-alloc pagelist */
-			ceph_pagelist_truncate(pagelist, &trunc_point);
-			err = ceph_pagelist_append(pagelist, &rec, reclen);
-			if (!err)
-				err = ceph_pagelist_reserve(pagelist,
-							    rec.v2.flock_len);
-
-			/* encode locks */
-			if (!err) {
-				lock_flocks();
-				err = ceph_encode_locks(inode,
-							pagelist,
-							num_fcntl_locks,
-							num_flock_locks);
-				unlock_flocks();
-			}
-		} while (err == -ENOSPC);
+encode_again:
+		lock_flocks();
+		ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+		unlock_flocks();
+		flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
+				 sizeof(struct ceph_filelock), GFP_NOFS);
+		if (!flocks) {
+			err = -ENOMEM;
+			goto out_free;
+		}
+		lock_flocks();
+		err = ceph_encode_locks_to_buffer(inode, flocks,
+						  num_fcntl_locks,
+						  num_flock_locks);
+		unlock_flocks();
+		if (err) {
+			kfree(flocks);
+			if (err == -ENOSPC)
+				goto encode_again;
+			goto out_free;
+		}
+		/*
+		 * number of encoded locks is stable, so copy to pagelist
+		 */
+		rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
+				    (num_fcntl_locks+num_flock_locks) *
+				    sizeof(struct ceph_filelock));
+		err = ceph_pagelist_append(pagelist, &rec, reclen);
+		if (!err)
+			err = ceph_locks_to_pagelist(flocks, pagelist,
+						     num_fcntl_locks,
+						     num_flock_locks);
+		kfree(flocks);
 	} else {
 		err = ceph_pagelist_append(pagelist, &rec, reclen);
 	}
-
 out_free:
 	kfree(path);
 out_dput:
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 8696be2f..7ccfdb4 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -822,8 +822,13 @@
 extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl);
 extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl);
 extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num);
-extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p,
-			     int p_locks, int f_locks);
+extern int ceph_encode_locks_to_buffer(struct inode *inode,
+				       struct ceph_filelock *flocks,
+				       int num_fcntl_locks,
+				       int num_flock_locks);
+extern int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
+				  struct ceph_pagelist *pagelist,
+				  int num_fcntl_locks, int num_flock_locks);
 extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c);
 
 /* debugfs.c */
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d5953b8..3a246a6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1675,13 +1675,13 @@
 		__register_request(osdc, req);
 		__unregister_linger_request(osdc, req);
 	}
+	reset_changed_osds(osdc);
 	mutex_unlock(&osdc->request_mutex);
 
 	if (needmap) {
 		dout("%d requests for down osds, need new map\n", needmap);
 		ceph_monc_request_next_osdmap(&osdc->client->monc);
 	}
-	reset_changed_osds(osdc);
 }