ceph: implement DIRLAYOUTHASH feature to get dir layout from MDS

This implements the DIRLAYOUTHASH protocol feature, which passes the dir
layout over the wire from the MDS.  This gives the client knowledge
of the correct hash function to use for mapping dentries among dir
fragments.

Note that if this feature is _not_ present on the client but is on the
MDS, the client may misdirect requests.  This will result in a forward
and degrade performance.  It may also result in inaccurate NFS filehandle
generation, which will prevent fh resolution when the inode is not present
in the client cache and the parent directories have been fragmented.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 045283c..e791fa3 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -682,6 +682,8 @@
 		inode->i_op = &ceph_dir_iops;
 		inode->i_fop = &ceph_dir_fops;
 
+		ci->i_dir_layout = iinfo->dir_layout;
+
 		ci->i_files = le64_to_cpu(info->files);
 		ci->i_subdirs = le64_to_cpu(info->subdirs);
 		ci->i_rbytes = le64_to_cpu(info->rbytes);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 38800ea..9be29b0 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -60,7 +60,8 @@
  * parse individual inode info
  */
 static int parse_reply_info_in(void **p, void *end,
-			       struct ceph_mds_reply_info_in *info)
+			       struct ceph_mds_reply_info_in *info,
+			       int features)
 {
 	int err = -EIO;
 
@@ -74,6 +75,12 @@
 	info->symlink = *p;
 	*p += info->symlink_len;
 
+	if (features & CEPH_FEATURE_DIRLAYOUTHASH)
+		ceph_decode_copy_safe(p, end, &info->dir_layout,
+				      sizeof(info->dir_layout), bad);
+	else
+		memset(&info->dir_layout, 0, sizeof(info->dir_layout));
+
 	ceph_decode_32_safe(p, end, info->xattr_len, bad);
 	ceph_decode_need(p, end, info->xattr_len, bad);
 	info->xattr_data = *p;
@@ -88,12 +95,13 @@
  * target inode.
  */
 static int parse_reply_info_trace(void **p, void *end,
-				  struct ceph_mds_reply_info_parsed *info)
+				  struct ceph_mds_reply_info_parsed *info,
+				  int features)
 {
 	int err;
 
 	if (info->head->is_dentry) {
-		err = parse_reply_info_in(p, end, &info->diri);
+		err = parse_reply_info_in(p, end, &info->diri, features);
 		if (err < 0)
 			goto out_bad;
 
@@ -114,7 +122,7 @@
 	}
 
 	if (info->head->is_target) {
-		err = parse_reply_info_in(p, end, &info->targeti);
+		err = parse_reply_info_in(p, end, &info->targeti, features);
 		if (err < 0)
 			goto out_bad;
 	}
@@ -134,7 +142,8 @@
  * parse readdir results
  */
 static int parse_reply_info_dir(void **p, void *end,
-				struct ceph_mds_reply_info_parsed *info)
+				struct ceph_mds_reply_info_parsed *info,
+				int features)
 {
 	u32 num, i = 0;
 	int err;
@@ -182,7 +191,7 @@
 		*p += sizeof(struct ceph_mds_reply_lease);
 
 		/* inode */
-		err = parse_reply_info_in(p, end, &info->dir_in[i]);
+		err = parse_reply_info_in(p, end, &info->dir_in[i], features);
 		if (err < 0)
 			goto out_bad;
 		i++;
@@ -205,7 +214,8 @@
  * parse fcntl F_GETLK results
  */
 static int parse_reply_info_filelock(void **p, void *end,
-                struct ceph_mds_reply_info_parsed *info)
+				     struct ceph_mds_reply_info_parsed *info,
+				     int features)
 {
 	if (*p + sizeof(*info->filelock_reply) > end)
 		goto bad;
@@ -225,19 +235,21 @@
  * parse extra results
  */
 static int parse_reply_info_extra(void **p, void *end,
-                struct ceph_mds_reply_info_parsed *info)
+				  struct ceph_mds_reply_info_parsed *info,
+				  int features)
 {
 	if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
-		return parse_reply_info_filelock(p, end, info);
+		return parse_reply_info_filelock(p, end, info, features);
 	else
-		return parse_reply_info_dir(p, end, info);
+		return parse_reply_info_dir(p, end, info, features);
 }
 
 /*
  * parse entire mds reply
  */
 static int parse_reply_info(struct ceph_msg *msg,
-			    struct ceph_mds_reply_info_parsed *info)
+			    struct ceph_mds_reply_info_parsed *info,
+			    int features)
 {
 	void *p, *end;
 	u32 len;
@@ -250,7 +262,7 @@
 	/* trace */
 	ceph_decode_32_safe(&p, end, len, bad);
 	if (len > 0) {
-		err = parse_reply_info_trace(&p, p+len, info);
+		err = parse_reply_info_trace(&p, p+len, info, features);
 		if (err < 0)
 			goto out_bad;
 	}
@@ -258,7 +270,7 @@
 	/* extra */
 	ceph_decode_32_safe(&p, end, len, bad);
 	if (len > 0) {
-		err = parse_reply_info_extra(&p, p+len, info);
+		err = parse_reply_info_extra(&p, p+len, info, features);
 		if (err < 0)
 			goto out_bad;
 	}
@@ -654,7 +666,7 @@
 		} else {
 			/* dir + name */
 			inode = dir;
-			hash = req->r_dentry->d_name.hash;
+			hash = ceph_dentry_hash(req->r_dentry);
 			is_hash = true;
 		}
 	}
@@ -2101,7 +2113,7 @@
 
 	dout("handle_reply tid %lld result %d\n", tid, result);
 	rinfo = &req->r_reply_info;
-	err = parse_reply_info(msg, rinfo);
+	err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
 	mutex_unlock(&mdsc->mutex);
 
 	mutex_lock(&session->s_mutex);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index aabe563..f8f27f6 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -35,6 +35,7 @@
  */
 struct ceph_mds_reply_info_in {
 	struct ceph_mds_reply_inode *in;
+	struct ceph_dir_layout dir_layout;
 	u32 symlink_len;
 	char *symlink;
 	u32 xattr_len;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 08b460a..1417f3f 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -428,7 +428,8 @@
 		goto fail;
 	}
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-	fsc->client->supported_features |= CEPH_FEATURE_FLOCK;
+	fsc->client->supported_features |= CEPH_FEATURE_FLOCK |
+		CEPH_FEATURE_DIRLAYOUTHASH;
 	fsc->client->monc.want_mdsmap = 1;
 
 	fsc->mount_options = fsopt;