ceph: negotiate authentication protocol; implement AUTH_NONE protocol

When we open a monitor session, we send an initial AUTH message listing
the auth protocols we support, our entity name, and (possibly) a previously
assigned global_id.  The monitor chooses a protocol and responds with an
initial message.

Initially implement AUTH_NONE, a dummy protocol that provides no security,
but works within the new framework.  It generates 'authorizers' that are
used when connecting to (mds, osd) services that simply state our entity
name and global_id.

This is a wire protocol change.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index bdd3e6f..827629c8 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -13,6 +13,7 @@
 	mon_client.o \
 	osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
 	debugfs.o \
+	auth.o auth_none.o \
 	ceph_fs.o ceph_strings.o ceph_hash.o ceph_frag.o
 
 else
diff --git a/fs/ceph/auth.c b/fs/ceph/auth.c
new file mode 100644
index 0000000..c4d1eee
--- /dev/null
+++ b/fs/ceph/auth.c
@@ -0,0 +1,220 @@
+#include "ceph_debug.h"
+
+#include <linux/module.h>
+#include <linux/err.h>
+
+#include "types.h"
+#include "auth_none.h"
+#include "decode.h"
+#include "super.h"
+
+#include "messenger.h"
+
+/*
+ * get protocol handler
+ */
+static u32 supported_protocols[] = {
+	CEPH_AUTH_NONE
+};
+
+int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol)
+{
+	switch (protocol) {
+	case CEPH_AUTH_NONE:
+		return ceph_auth_none_init(ac);
+	default:
+		return -ENOENT;
+	}
+}
+
+/*
+ * setup, teardown.
+ */
+struct ceph_auth_client *ceph_auth_init(const char *name, const char *secret)
+{
+	struct ceph_auth_client *ac;
+	int ret;
+
+	dout("auth_init name '%s' secret '%s'\n", name, secret);
+
+	ret = -ENOMEM;
+	ac = kzalloc(sizeof(*ac), GFP_NOFS);
+	if (!ac)
+		goto out;
+
+	ac->negotiating = true;
+	if (name)
+		ac->name = name;
+	else
+		ac->name = CEPH_AUTH_NAME_DEFAULT;
+	dout("auth_init name %s secret %s\n", ac->name, secret);
+	ac->secret = secret;
+	return ac;
+
+out:
+	return ERR_PTR(ret);
+}
+
+void ceph_auth_destroy(struct ceph_auth_client *ac)
+{
+	dout("auth_destroy %p\n", ac);
+	if (ac->ops)
+		ac->ops->destroy(ac);
+	kfree(ac);
+}
+
+/*
+ * Reset occurs when reconnecting to the monitor.
+ */
+void ceph_auth_reset(struct ceph_auth_client *ac)
+{
+	dout("auth_reset %p\n", ac);
+	if (ac->ops && !ac->negotiating)
+		ac->ops->reset(ac);
+	ac->negotiating = true;
+}
+
+int ceph_entity_name_encode(const char *name, void **p, void *end)
+{
+	int len = strlen(name);
+
+	if (*p + 2*sizeof(u32) + len > end)
+		return -ERANGE;
+	ceph_encode_32(p, CEPH_ENTITY_TYPE_CLIENT);
+	ceph_encode_32(p, len);
+	ceph_encode_copy(p, name, len);
+	return 0;
+}
+
+/*
+ * Initiate protocol negotiation with monitor.  Include entity name
+ * and list supported protocols.
+ */
+int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len)
+{
+	struct ceph_mon_request_header *monhdr = buf;
+	void *p = monhdr + 1, *end = buf + len, *lenp;
+	int i, num;
+	int ret;
+
+	dout("auth_build_hello\n");
+	monhdr->have_version = 0;
+	monhdr->session_mon = cpu_to_le16(-1);
+	monhdr->session_mon_tid = 0;
+
+	ceph_encode_32(&p, 0);  /* no protocol, yet */
+
+	lenp = p;
+	p += sizeof(u32);
+
+	num = ARRAY_SIZE(supported_protocols);
+	ceph_encode_32(&p, num);
+	for (i = 0; i < num; i++)
+		ceph_encode_32(&p, supported_protocols[i]);
+
+	ret = ceph_entity_name_encode(ac->name, &p, end);
+	if (ret < 0)
+		return ret;
+	ceph_decode_need(&p, end, sizeof(u64), bad);
+	ceph_encode_64(&p, ac->global_id);
+
+	ceph_encode_32(&lenp, p - lenp - sizeof(u32));
+	return p - buf;
+
+bad:
+	return -ERANGE;
+}
+
+/*
+ * Handle auth message from monitor.
+ */
+int ceph_handle_auth_reply(struct ceph_auth_client *ac,
+			   void *buf, size_t len,
+			   void *reply_buf, size_t reply_len)
+{
+	void *p = buf;
+	void *end = buf + len;
+	int protocol;
+	s32 result;
+	u64 global_id;
+	void *payload, *payload_end;
+	int payload_len;
+	char *result_msg;
+	int result_msg_len;
+	int ret = -EINVAL;
+
+	dout("handle_auth_reply %p %p\n", p, end);
+	ceph_decode_need(&p, end, sizeof(u32) * 3 + sizeof(u64), bad);
+	protocol = ceph_decode_32(&p);
+	result = ceph_decode_32(&p);
+	global_id = ceph_decode_64(&p);
+	payload_len = ceph_decode_32(&p);
+	payload = p;
+	p += payload_len;
+	ceph_decode_need(&p, end, sizeof(u32), bad);
+	result_msg_len = ceph_decode_32(&p);
+	result_msg = p;
+	p += result_msg_len;
+	if (p != end)
+		goto bad;
+
+	dout(" result %d '%.*s' gid %llu len %d\n", result, result_msg_len,
+	     result_msg, global_id, payload_len);
+
+	payload_end = payload + payload_len;
+
+	if (global_id && ac->global_id != global_id) {
+		dout(" set global_id %lld -> %lld\n", ac->global_id, global_id);
+		ac->global_id = global_id;
+	}
+
+	if (ac->negotiating) {
+		/* set up (new) protocol handler? */
+		if (ac->protocol && ac->protocol != protocol) {
+			ac->ops->destroy(ac);
+			ac->protocol = 0;
+			ac->ops = NULL;
+		}
+		if (ac->protocol != protocol) {
+			ret = ceph_auth_init_protocol(ac, protocol);
+			if (ret) {
+				pr_err("error %d on auth protocol %d init\n",
+				       ret, protocol);
+				goto out;
+			}
+		}
+	}
+
+	ret = ac->ops->handle_reply(ac, result, payload, payload_end);
+	if (ret == -EAGAIN) {
+		struct ceph_mon_request_header *monhdr = reply_buf;
+		void *p = reply_buf + 1;
+		void *end = reply_buf + reply_len;
+
+		monhdr->have_version = 0;
+		monhdr->session_mon = cpu_to_le16(-1);
+		monhdr->session_mon_tid = 0;
+
+		ceph_encode_32(&p, ac->protocol);
+
+		ret = ac->ops->build_request(ac, p + sizeof(u32), end);
+		if (ret < 0) {
+			pr_err("error %d building request\n", ret);
+			goto out;
+		}
+		dout(" built request %d bytes\n", ret);
+		ceph_encode_32(&p, ret);
+		return p + ret - reply_buf;
+	} else if (ret) {
+		pr_err("authentication error %d\n", ret);
+		return ret;
+	}
+	return 0;
+
+bad:
+	pr_err("failed to decode auth msg\n");
+out:
+	return ret;
+}
+
+
diff --git a/fs/ceph/auth.h b/fs/ceph/auth.h
new file mode 100644
index 0000000..4d8cdf6
--- /dev/null
+++ b/fs/ceph/auth.h
@@ -0,0 +1,77 @@
+#ifndef _FS_CEPH_AUTH_H
+#define _FS_CEPH_AUTH_H
+
+#include "types.h"
+#include "buffer.h"
+
+/*
+ * Abstract interface for communicating with the authenticate module.
+ * There is some handshake that takes place between us and the monitor
+ * to acquire the necessary keys.  These are used to generate an
+ * 'authorizer' that we use when connecting to a service (mds, osd).
+ */
+
+struct ceph_auth_client;
+struct ceph_authorizer;
+
+struct ceph_auth_client_ops {
+	/*
+	 * true if we are authenticated and can connect to
+	 * services.
+	 */
+	int (*is_authenticated)(struct ceph_auth_client *ac);
+
+	/*
+	 * build requests and process replies during monitor
+	 * handshake.  if handle_reply returns -EAGAIN, we build
+	 * another request.
+	 */
+	int (*build_request)(struct ceph_auth_client *ac, void *buf, void *end);
+	int (*handle_reply)(struct ceph_auth_client *ac, int result,
+			    void *buf, void *end);
+
+	/*
+	 * Create authorizer for connecting to a service, and verify
+	 * the response to authenticate the service.
+	 */
+	int (*create_authorizer)(struct ceph_auth_client *ac, int peer_type,
+				 struct ceph_authorizer **a,
+				 void **buf, size_t *len,
+				 void **reply_buf, size_t *reply_len);
+	int (*verify_authorizer_reply)(struct ceph_auth_client *ac,
+				       struct ceph_authorizer *a, size_t len);
+	void (*destroy_authorizer)(struct ceph_auth_client *ac,
+				   struct ceph_authorizer *a);
+
+	/* reset when we (re)connect to a monitor */
+	void (*reset)(struct ceph_auth_client *ac);
+
+	void (*destroy)(struct ceph_auth_client *ac);
+};
+
+struct ceph_auth_client {
+	u32 protocol;           /* CEPH_AUTH_* */
+	void *private;          /* for use by protocol implementation */
+	const struct ceph_auth_client_ops *ops;  /* null iff protocol==0 */
+
+	bool negotiating;       /* true if negotiating protocol */
+	const char *name;       /* entity name */
+	u64 global_id;          /* our unique id in system */
+	const char *secret;     /* our secret key */
+	unsigned want_keys;     /* which services we want */
+};
+
+extern struct ceph_auth_client *ceph_auth_init(const char *name,
+					       const char *secret);
+extern void ceph_auth_destroy(struct ceph_auth_client *ac);
+
+extern void ceph_auth_reset(struct ceph_auth_client *ac);
+
+extern int ceph_auth_build_hello(struct ceph_auth_client *ac,
+				 void *buf, size_t len);
+extern int ceph_handle_auth_reply(struct ceph_auth_client *ac,
+				  void *buf, size_t len,
+				  void *reply_buf, size_t reply_len);
+extern int ceph_entity_name_encode(const char *name, void **p, void *end);
+
+#endif
diff --git a/fs/ceph/auth_none.c b/fs/ceph/auth_none.c
new file mode 100644
index 0000000..631017e
--- /dev/null
+++ b/fs/ceph/auth_none.c
@@ -0,0 +1,120 @@
+
+#include "ceph_debug.h"
+
+#include <linux/err.h>
+#include <linux/module.h>
+#include <linux/random.h>
+
+#include "auth_none.h"
+#include "auth.h"
+#include "decode.h"
+
+static void reset(struct ceph_auth_client *ac)
+{
+	struct ceph_auth_none_info *xi = ac->private;
+
+	xi->starting = true;
+	xi->built_authorizer = false;
+}
+
+static void destroy(struct ceph_auth_client *ac)
+{
+	kfree(ac->private);
+	ac->private = NULL;
+}
+
+static int is_authenticated(struct ceph_auth_client *ac)
+{
+	struct ceph_auth_none_info *xi = ac->private;
+
+	return !xi->starting;
+}
+
+/*
+ * the generic auth code decode the global_id, and we carry no actual
+ * authenticate state, so nothing happens here.
+ */
+static int handle_reply(struct ceph_auth_client *ac, int result,
+			void *buf, void *end)
+{
+	struct ceph_auth_none_info *xi = ac->private;
+
+	xi->starting = false;
+	return result;
+}
+
+/*
+ * build an 'authorizer' with our entity_name and global_id.  we can
+ * reuse a single static copy since it is identical for all services
+ * we connect to.
+ */
+static int ceph_auth_none_create_authorizer(
+	struct ceph_auth_client *ac, int peer_type,
+	struct ceph_authorizer **a,
+	void **buf, size_t *len,
+	void **reply_buf, size_t *reply_len)
+{
+	struct ceph_auth_none_info *ai = ac->private;
+	struct ceph_none_authorizer *au = &ai->au;
+	void *p, *end;
+	int ret;
+
+	if (!ai->built_authorizer) {
+		p = au->buf;
+		end = p + sizeof(au->buf);
+		ret = ceph_entity_name_encode(ac->name, &p, end - 8);
+		if (ret < 0)
+			goto bad;
+		ceph_decode_need(&p, end, sizeof(u64), bad2);
+		ceph_encode_64(&p, ac->global_id);
+		au->buf_len = p - (void *)au->buf;
+		ai->built_authorizer = true;
+		dout("built authorizer len %d\n", au->buf_len);
+	}
+
+	*a = (struct ceph_authorizer *)au;
+	*buf = au->buf;
+	*len = au->buf_len;
+	*reply_buf = au->reply_buf;
+	*reply_len = sizeof(au->reply_buf);
+	return 0;
+
+bad2:
+	ret = -ERANGE;
+bad:
+	return ret;
+}
+
+static void ceph_auth_none_destroy_authorizer(struct ceph_auth_client *ac,
+				      struct ceph_authorizer *a)
+{
+	/* nothing to do */
+}
+
+static const struct ceph_auth_client_ops ceph_auth_none_ops = {
+	.reset = reset,
+	.destroy = destroy,
+	.is_authenticated = is_authenticated,
+	.handle_reply = handle_reply,
+	.create_authorizer = ceph_auth_none_create_authorizer,
+	.destroy_authorizer = ceph_auth_none_destroy_authorizer,
+};
+
+int ceph_auth_none_init(struct ceph_auth_client *ac)
+{
+	struct ceph_auth_none_info *xi;
+
+	dout("ceph_auth_none_init %p\n", ac);
+	xi = kzalloc(sizeof(*xi), GFP_NOFS);
+	if (!xi)
+		return -ENOMEM;
+
+	xi->starting = true;
+	xi->built_authorizer = false;
+
+	ac->protocol = CEPH_AUTH_NONE;
+	ac->private = xi;
+	ac->ops = &ceph_auth_none_ops;
+	return 0;
+}
+
diff --git a/fs/ceph/auth_none.h b/fs/ceph/auth_none.h
new file mode 100644
index 0000000..56c0553
--- /dev/null
+++ b/fs/ceph/auth_none.h
@@ -0,0 +1,28 @@
+#ifndef _FS_CEPH_AUTH_NONE_H
+#define _FS_CEPH_AUTH_NONE_H
+
+#include "auth.h"
+
+/*
+ * null security mode.
+ *
+ * we use a single static authorizer that simply encodes our entity name
+ * and global id.
+ */
+
+struct ceph_none_authorizer {
+	char buf[128];
+	int buf_len;
+	char reply_buf[0];
+};
+
+struct ceph_auth_none_info {
+	bool starting;
+	bool built_authorizer;
+	struct ceph_none_authorizer au;   /* we only need one; it's static */
+};
+
+extern int ceph_auth_none_init(struct ceph_auth_client *ac);
+
+#endif
+
diff --git a/fs/ceph/ceph_fs.h b/fs/ceph/ceph_fs.h
index 36becb0..1e96a9a 100644
--- a/fs/ceph/ceph_fs.h
+++ b/fs/ceph/ceph_fs.h
@@ -75,6 +75,16 @@
 int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
 
 
+/* crypto algorithms */
+#define CEPH_CRYPTO_NONE 0x0
+#define CEPH_CRYPTO_AES  0x1
+
+/* security/authentication protocols */
+#define CEPH_AUTH_UNKNOWN	0x0
+#define CEPH_AUTH_NONE	 	0x1
+#define CEPH_AUTH_CEPHX	 	0x2
+
+
 /*********************************************
  * message layer
  */
@@ -90,12 +100,12 @@
 /* client <-> monitor */
 #define CEPH_MSG_MON_MAP                4
 #define CEPH_MSG_MON_GET_MAP            5
-#define CEPH_MSG_CLIENT_MOUNT           10
-#define CEPH_MSG_CLIENT_MOUNT_ACK       11
 #define CEPH_MSG_STATFS                 13
 #define CEPH_MSG_STATFS_REPLY           14
 #define CEPH_MSG_MON_SUBSCRIBE          15
 #define CEPH_MSG_MON_SUBSCRIBE_ACK      16
+#define CEPH_MSG_AUTH			17
+#define CEPH_MSG_AUTH_REPLY		18
 
 /* client <-> mds */
 #define CEPH_MSG_MDS_MAP                21
diff --git a/fs/ceph/ceph_strings.c b/fs/ceph/ceph_strings.c
index 90d19d9..8e4be6a 100644
--- a/fs/ceph/ceph_strings.c
+++ b/fs/ceph/ceph_strings.c
@@ -3,6 +3,19 @@
  */
 #include "types.h"
 
+const char *ceph_entity_type_name(int type)
+{
+	switch (type) {
+	case CEPH_ENTITY_TYPE_MDS: return "mds";
+	case CEPH_ENTITY_TYPE_OSD: return "osd";
+	case CEPH_ENTITY_TYPE_MON: return "mon";
+	case CEPH_ENTITY_TYPE_CLIENT: return "client";
+	case CEPH_ENTITY_TYPE_ADMIN: return "admin";
+	case CEPH_ENTITY_TYPE_AUTH: return "auth";
+	default: return "unknown";
+	}
+}
+
 const char *ceph_osd_op_name(int op)
 {
 	switch (op) {
diff --git a/fs/ceph/decode.h b/fs/ceph/decode.h
index a382aec..10de848 100644
--- a/fs/ceph/decode.h
+++ b/fs/ceph/decode.h
@@ -98,6 +98,7 @@
 static inline void ceph_decode_addr(struct ceph_entity_addr *a)
 {
 	a->in_addr.ss_family = ntohs(a->in_addr.ss_family);
+	WARN_ON(a->in_addr.ss_family == 512);
 }
 
 /*
@@ -123,6 +124,11 @@
 	*(u8 *)*p = v;
 	(*p)++;
 }
+static inline void ceph_encode_copy(void **p, const void *s, int len)
+{
+	memcpy(*p, s, len);
+	*p += len;
+}
 
 /*
  * filepath, string encoders
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 69feeb1..8a28515 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -8,6 +8,7 @@
 #include "super.h"
 #include "messenger.h"
 #include "decode.h"
+#include "auth.h"
 
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
@@ -274,8 +275,12 @@
 {
 	dout("mdsc put_session %p %d -> %d\n", s,
 	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
-	if (atomic_dec_and_test(&s->s_ref))
+	if (atomic_dec_and_test(&s->s_ref)) {
+		if (s->s_authorizer)
+			s->s_mdsc->client->monc.auth->ops->destroy_authorizer(
+				s->s_mdsc->client->monc.auth, s->s_authorizer);
 		kfree(s);
+	}
 }
 
 /*
@@ -2777,9 +2782,15 @@
 
 	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
-	if (ceph_fsid_compare(&fsid, &mdsc->client->monc.monmap->fsid)) {
-		pr_err("got mdsmap with wrong fsid\n");
-		return;
+        if (mdsc->client->monc.have_fsid) {
+		if (ceph_fsid_compare(&fsid,
+				      &mdsc->client->monc.monmap->fsid)) {
+			pr_err("got mdsmap with wrong fsid\n");
+			return;
+		}
+	} else {
+		ceph_fsid_set(&mdsc->client->monc.monmap->fsid, &fsid);
+		mdsc->client->monc.have_fsid = true;
 	}
 	epoch = ceph_decode_32(&p);
 	maplen = ceph_decode_32(&p);
@@ -2895,10 +2906,60 @@
 	ceph_msg_put(msg);
 }
 
+/*
+ * authentication
+ */
+static int get_authorizer(struct ceph_connection *con,
+			  void **buf, int *len, int *proto,
+			  void **reply_buf, int *reply_len, int force_new)
+{
+	struct ceph_mds_session *s = con->private;
+	struct ceph_mds_client *mdsc = s->s_mdsc;
+	struct ceph_auth_client *ac = mdsc->client->monc.auth;
+	int ret = 0;
+
+	if (force_new && s->s_authorizer) {
+		ac->ops->destroy_authorizer(ac, s->s_authorizer);
+		s->s_authorizer = NULL;
+	}
+	if (s->s_authorizer == NULL) {
+		if (ac->ops->create_authorizer) {
+			ret = ac->ops->create_authorizer(
+				ac, CEPH_ENTITY_TYPE_MDS,
+				&s->s_authorizer,
+				&s->s_authorizer_buf,
+				&s->s_authorizer_buf_len,
+				&s->s_authorizer_reply_buf,
+				&s->s_authorizer_reply_buf_len);
+			if (ret)
+				return ret;
+		}
+	}
+
+	*proto = ac->protocol;
+	*buf = s->s_authorizer_buf;
+	*len = s->s_authorizer_buf_len;
+	*reply_buf = s->s_authorizer_reply_buf;
+	*reply_len = s->s_authorizer_reply_buf_len;
+	return 0;
+}
+
+
+static int verify_authorizer_reply(struct ceph_connection *con, int len)
+{
+	struct ceph_mds_session *s = con->private;
+	struct ceph_mds_client *mdsc = s->s_mdsc;
+	struct ceph_auth_client *ac = mdsc->client->monc.auth;
+
+	return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
+}
+
 const static struct ceph_connection_operations mds_con_ops = {
 	.get = con_get,
 	.put = con_put,
 	.dispatch = dispatch,
+	.get_authorizer = get_authorizer,
+	.verify_authorizer_reply = verify_authorizer_reply,
 	.peer_reset = peer_reset,
 	.alloc_msg = ceph_alloc_msg,
 	.alloc_middle = ceph_alloc_middle,
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 7c43948..9faa1b2 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -100,6 +100,10 @@
 
 	struct ceph_connection s_con;
 
+	struct ceph_authorizer *s_authorizer;
+	void             *s_authorizer_buf, *s_authorizer_reply_buf;
+	size_t            s_authorizer_buf_len, s_authorizer_reply_buf_len;
+
 	/* protected by s_cap_lock */
 	spinlock_t        s_cap_lock;
 	u32               s_cap_gen;  /* inc each time we get mds stale msg */
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index d8a6a56..0b16748 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -550,6 +550,27 @@
  * Connection negotiation.
  */
 
+static void prepare_connect_authorizer(struct ceph_connection *con)
+{
+	void *auth_buf;
+	int auth_len = 0;
+	int auth_protocol = 0;
+
+	if (con->ops->get_authorizer)
+		con->ops->get_authorizer(con, &auth_buf, &auth_len,
+					 &auth_protocol, &con->auth_reply_buf,
+					 &con->auth_reply_buf_len,
+					 con->auth_retry);
+
+	con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
+	con->out_connect.authorizer_len = cpu_to_le32(auth_len);
+
+	con->out_kvec[con->out_kvec_left].iov_base = auth_buf;
+	con->out_kvec[con->out_kvec_left].iov_len = auth_len;
+	con->out_kvec_left++;
+	con->out_kvec_bytes += auth_len;
+}
+
 /*
  * We connected to a peer and are saying hello.
  */
@@ -592,6 +613,7 @@
 
 	dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
 	     con->connect_seq, global_seq, proto);
+
 	con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
 	con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
 	con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -611,6 +633,8 @@
 	con->out_kvec_cur = con->out_kvec;
 	con->out_more = 0;
 	set_bit(WRITE_PENDING, &con->state);
+
+	prepare_connect_authorizer(con);
 }
 
 
@@ -777,6 +801,13 @@
 	con->in_base_pos = 0;
 }
 
+static void prepare_read_connect_retry(struct ceph_connection *con)
+{
+	dout("prepare_read_connect_retry %p\n", con);
+	con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->actual_peer_addr)
+		+ sizeof(con->peer_addr_for_me);
+}
+
 static void prepare_read_ack(struct ceph_connection *con)
 {
 	dout("prepare_read_ack %p\n", con);
@@ -853,9 +884,14 @@
 	ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply);
 	if (ret <= 0)
 		goto out;
+	ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len),
+			   con->auth_reply_buf);
+	if (ret <= 0)
+		goto out;
 
-	dout("read_partial_connect %p connect_seq = %u, global_seq = %u\n",
-	     con, le32_to_cpu(con->in_reply.connect_seq),
+	dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
+	     con, (int)con->in_reply.tag,
+	     le32_to_cpu(con->in_reply.connect_seq),
 	     le32_to_cpu(con->in_reply.global_seq));
 out:
 	return ret;
@@ -1051,6 +1087,20 @@
 		set_bit(CLOSED, &con->state);  /* in case there's queued work */
 		return -1;
 
+	case CEPH_MSGR_TAG_BADAUTHORIZER:
+		con->auth_retry++;
+		dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
+		     con->auth_retry);
+		if (con->auth_retry == 2) {
+			con->error_msg = "connect authorization failure";
+			reset_connection(con);
+			set_bit(CLOSED, &con->state);
+			return -1;
+		}
+		con->auth_retry = 1;
+		prepare_write_connect(con->msgr, con, 0);
+		prepare_read_connect_retry(con);
+		break;
 
 	case CEPH_MSGR_TAG_RESETSESSION:
 		/*
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 4bd85c3..f9c9f64 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -26,6 +26,12 @@
 	/* handle an incoming message. */
 	void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
 
+	/* authorize an outgoing connection */
+	int (*get_authorizer) (struct ceph_connection *con,
+			       void **buf, int *len, int *proto,
+			       void **reply_buf, int *reply_len, int force_new);
+	int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
+
 	/* protocol version mismatch */
 	void (*bad_proto) (struct ceph_connection *con);
 
@@ -144,6 +150,10 @@
 				 attempt for this connection, client */
 	u32 peer_global_seq;  /* peer's global seq for this connection */
 
+	int auth_retry;       /* true if we need a newer authorizer */
+	void *auth_reply_buf;   /* where to put the authorizer reply */
+	int auth_reply_buf_len;
+
 	/* out queue */
 	struct mutex out_mutex;
 	struct list_head out_queue;
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 95b76e7..017d5ae 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -6,6 +6,7 @@
 
 #include "mon_client.h"
 #include "super.h"
+#include "auth.h"
 #include "decode.h"
 
 /*
@@ -38,6 +39,10 @@
 	struct ceph_fsid fsid;
 	u32 epoch, num_mon;
 	u16 version;
+	u32 len;
+
+	ceph_decode_32_safe(&p, end, len, bad);
+	ceph_decode_need(&p, end, len, bad);
 
 	dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
 
@@ -95,8 +100,10 @@
 {
 	if (monc->con) {
 		dout("__close_session closing mon%d\n", monc->cur_mon);
+		ceph_con_revoke(monc->con, monc->m_auth);
 		ceph_con_close(monc->con);
 		monc->cur_mon = -1;
+		ceph_auth_reset(monc->auth);
 	}
 }
 
@@ -106,6 +113,7 @@
 static int __open_session(struct ceph_mon_client *monc)
 {
 	char r;
+	int ret;
 
 	if (monc->cur_mon < 0) {
 		get_random_bytes(&r, 1);
@@ -121,6 +129,15 @@
 		monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
 		ceph_con_open(monc->con,
 			      &monc->monmap->mon_inst[monc->cur_mon].addr);
+
+		/* initiatiate authentication handshake */
+		ret = ceph_auth_build_hello(monc->auth,
+					    monc->m_auth->front.iov_base,
+					    monc->m_auth->front_max);
+		monc->m_auth->front.iov_len = ret;
+		monc->m_auth->hdr.front_len = cpu_to_le32(ret);
+		ceph_msg_get(monc->m_auth);  /* keep our ref */
+		ceph_con_send(monc->con, monc->m_auth);
 	} else {
 		dout("open_session mon%d already open\n", monc->cur_mon);
 	}
@@ -139,7 +156,7 @@
 {
 	unsigned delay;
 
-	if (monc->cur_mon < 0 || monc->want_mount || __sub_expired(monc))
+	if (monc->cur_mon < 0 || __sub_expired(monc))
 		delay = 10 * HZ;
 	else
 		delay = 20 * HZ;
@@ -161,7 +178,7 @@
 		struct ceph_mon_subscribe_item *i;
 		void *p, *end;
 
-		msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 64, 0, 0, NULL);
+		msg = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, 0, 0, NULL);
 		if (!msg)
 			return;
 
@@ -173,7 +190,7 @@
 		if (monc->want_next_osdmap) {
 			dout("__send_subscribe to 'osdmap' %u\n",
 			     (unsigned)monc->have_osdmap);
-			ceph_encode_32(&p, 2);
+			ceph_encode_32(&p, 3);
 			ceph_encode_string(&p, end, "osdmap", 6);
 			i = p;
 			i->have = cpu_to_le64(monc->have_osdmap);
@@ -181,13 +198,18 @@
 			p += sizeof(*i);
 			monc->want_next_osdmap = 2;  /* requested */
 		} else {
-			ceph_encode_32(&p, 1);
+			ceph_encode_32(&p, 2);
 		}
 		ceph_encode_string(&p, end, "mdsmap", 6);
 		i = p;
 		i->have = cpu_to_le64(monc->have_mdsmap);
 		i->onetime = 0;
 		p += sizeof(*i);
+		ceph_encode_string(&p, end, "monmap", 6);
+		i = p;
+		i->have = 0;
+		i->onetime = 0;
+		p += sizeof(*i);
 
 		msg->front.iov_len = p - msg->front.iov_base;
 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -256,7 +278,7 @@
 	mutex_unlock(&monc->mutex);
 }
 
-
+#if 0
 /*
  * mount
  */
@@ -264,12 +286,8 @@
 {
 	struct ceph_msg *msg;
 	struct ceph_client_mount *h;
-	int err;
 
 	dout("__request_mount\n");
-	err = __open_session(monc);
-	if (err)
-		return;
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_MOUNT, sizeof(*h), 0, 0, NULL);
 	if (IS_ERR(msg))
 		return;
@@ -279,8 +297,12 @@
 	h->monhdr.session_mon_tid = 0;
 	ceph_con_send(monc->con, msg);
 }
+#endif
 
-int ceph_monc_request_mount(struct ceph_mon_client *monc)
+/*
+ * 
+ */
+int ceph_monc_open_session(struct ceph_mon_client *monc)
 {
 	if (!monc->con) {
 		monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
@@ -292,12 +314,14 @@
 	}
 
 	mutex_lock(&monc->mutex);
-	__request_mount(monc);
+	__open_session(monc);
 	__schedule_delayed(monc);
 	mutex_unlock(&monc->mutex);
 	return 0;
 }
 
+#if 0
+
 /*
  * The monitor responds with mount ack indicate mount success.  The
  * included client ticket allows the client to talk to MDSs and OSDs.
@@ -372,9 +396,65 @@
 	mutex_unlock(&monc->mutex);
 	wake_up(&client->mount_wq);
 }
+#endif
+
+/*
+ * The monitor responds with mount ack indicate mount success.  The
+ * included client ticket allows the client to talk to MDSs and OSDs.
+ */
+static void ceph_monc_handle_map(struct ceph_mon_client *monc, struct ceph_msg *msg)
+{
+	struct ceph_client *client = monc->client;
+	struct ceph_monmap *monmap = NULL, *old = monc->monmap;
+	void *p, *end;
+
+	mutex_lock(&monc->mutex);
+
+	dout("handle_monmap\n");
+	p = msg->front.iov_base;
+	end = p + msg->front.iov_len;
+
+	monmap = ceph_monmap_decode(p, end);
+	if (IS_ERR(monmap)) {
+		pr_err("problem decoding monmap, %d\n",
+		       (int)PTR_ERR(monmap));
+		return;
+	}
+	if (monc->have_fsid &&
+	    ceph_fsid_compare(&monmap->fsid, &monc->monmap->fsid)) {
+		print_hex_dump(KERN_ERR, "monmap->fsid: ", DUMP_PREFIX_NONE, 16, 1,
+			       (void *)&monmap->fsid, 16, 0);
+		print_hex_dump(KERN_ERR, "monc->monmap->fsid: ", DUMP_PREFIX_NONE, 16, 1,
+			       (void *)&monc->monmap->fsid, 16, 0);
+
+		pr_err("fsid mismatch, got a previous map with different fsid");
+		kfree(monmap);
+		return;
+	}
+
+	client->monc.monmap = monmap;
+	client->monc.have_fsid = true;
+	kfree(old);
+
+	mutex_unlock(&monc->mutex);
+	wake_up(&client->mount_wq);
+}
 
 
+/*
+ * init client info after authentication
+ */
+static void __init_authenticated_client(struct ceph_mon_client *monc)
+{
+	struct ceph_client *client = monc->client;
 
+	client->signed_ticket = NULL;
+	client->signed_ticket_len = 0;
+	client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
+	client->msgr->inst.name.num = monc->auth->global_id;
+
+	ceph_debugfs_client_init(client);
+}
 
 /*
  * statfs
@@ -414,12 +494,8 @@
 {
 	struct ceph_msg *msg;
 	struct ceph_mon_statfs *h;
-	int err;
 
 	dout("send_statfs tid %llu\n", req->tid);
-	err = __open_session(monc);
-	if (err)
-		return err;
 	msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
 	if (IS_ERR(msg))
 		return PTR_ERR(msg);
@@ -514,17 +590,14 @@
 
 	dout("monc delayed_work\n");
 	mutex_lock(&monc->mutex);
-	if (monc->want_mount) {
-		__request_mount(monc);
+	if (monc->hunting) {
+		__close_session(monc);
+		__open_session(monc);  /* continue hunting */
 	} else {
-		if (monc->hunting) {
-			__close_session(monc);
-			__open_session(monc);  /* continue hunting */
-		} else {
-			ceph_con_keepalive(monc->con);
-		}
+		ceph_con_keepalive(monc->con);
+		if (monc->auth->ops->is_authenticated(monc->auth))
+			__send_subscribe(monc);
 	}
-	__send_subscribe(monc);
 	__schedule_delayed(monc);
 	mutex_unlock(&monc->mutex);
 }
@@ -555,6 +628,7 @@
 		monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
 	}
 	monc->monmap->num_mon = num_mon;
+	monc->have_fsid = false;
 
 	/* release addr memory */
 	kfree(args->mon_addr);
@@ -579,21 +653,37 @@
 
 	monc->con = NULL;
 
+	/* authentication */
+	monc->auth = ceph_auth_init(cl->mount_args->name,
+				    cl->mount_args->secret);
+	if (IS_ERR(monc->auth))
+		return PTR_ERR(monc->auth);
+	monc->auth->want_keys =
+		CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
+		CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
+
 	/* msg pools */
-	err = ceph_msgpool_init(&monc->msgpool_mount_ack, 4096, 1, false);
-	if (err < 0)
-		goto out;
 	err = ceph_msgpool_init(&monc->msgpool_subscribe_ack,
 			       sizeof(struct ceph_mon_subscribe_ack), 1, false);
 	if (err < 0)
-		goto out;
+		goto out_monmap;
 	err = ceph_msgpool_init(&monc->msgpool_statfs_reply,
 				sizeof(struct ceph_mon_statfs_reply), 0, false);
 	if (err < 0)
-		goto out;
+		goto out_pool1;
+	err = ceph_msgpool_init(&monc->msgpool_auth_reply, 4096, 1, false);
+	if (err < 0)
+		goto out_pool2;
+
+	monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, 0, 0, NULL);
+	if (IS_ERR(monc->m_auth)) {
+		err = PTR_ERR(monc->m_auth);
+		monc->m_auth = NULL;
+		goto out_pool3;
+	}
 
 	monc->cur_mon = -1;
-	monc->hunting = false;  /* not really */
+	monc->hunting = true;
 	monc->sub_renew_after = jiffies;
 	monc->sub_sent = 0;
 
@@ -605,7 +695,16 @@
 	monc->have_mdsmap = 0;
 	monc->have_osdmap = 0;
 	monc->want_next_osdmap = 1;
-	monc->want_mount = true;
+	return 0;
+
+out_pool3:
+	ceph_msgpool_destroy(&monc->msgpool_auth_reply);
+out_pool2:
+	ceph_msgpool_destroy(&monc->msgpool_subscribe_ack);
+out_pool1:
+	ceph_msgpool_destroy(&monc->msgpool_statfs_reply);
+out_monmap:
+	kfree(monc->monmap);
 out:
 	return err;
 }
@@ -624,14 +723,44 @@
 	}
 	mutex_unlock(&monc->mutex);
 
-	ceph_msgpool_destroy(&monc->msgpool_mount_ack);
+	ceph_auth_destroy(monc->auth);
+
+	ceph_msg_put(monc->m_auth);
 	ceph_msgpool_destroy(&monc->msgpool_subscribe_ack);
 	ceph_msgpool_destroy(&monc->msgpool_statfs_reply);
+	ceph_msgpool_destroy(&monc->msgpool_auth_reply);
 
 	kfree(monc->monmap);
 }
 
 
+static void handle_auth_reply(struct ceph_mon_client *monc,
+			      struct ceph_msg *msg)
+{
+	int ret;
+
+	mutex_lock(&monc->mutex);
+	ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
+				     msg->front.iov_len,
+				     monc->m_auth->front.iov_base,
+				     monc->m_auth->front_max);
+	if (ret < 0) {
+		monc->client->mount_err = ret;
+		wake_up(&monc->client->mount_wq);
+	} else if (ret > 0) {
+		monc->m_auth->front.iov_len = ret;
+		monc->m_auth->hdr.front_len = cpu_to_le32(ret);
+		ceph_msg_get(monc->m_auth);  /* keep our ref */
+		ceph_con_send(monc->con, monc->m_auth);
+	} else if (monc->auth->ops->is_authenticated(monc->auth)) {
+		dout("authenticated, starting session\n");
+		__init_authenticated_client(monc);
+		__send_subscribe(monc);
+		__resend_statfs(monc);
+	}
+	mutex_unlock(&monc->mutex);
+}
+
 /*
  * handle incoming message
  */
@@ -644,8 +773,8 @@
 		return;
 
 	switch (type) {
-	case CEPH_MSG_CLIENT_MOUNT_ACK:
-		handle_mount_ack(monc, msg);
+	case CEPH_MSG_AUTH_REPLY:
+		handle_auth_reply(monc, msg);
 		break;
 
 	case CEPH_MSG_MON_SUBSCRIBE_ACK:
@@ -656,6 +785,10 @@
 		handle_statfs_reply(monc, msg);
 		break;
 
+	case CEPH_MSG_MON_MAP:
+		ceph_monc_handle_map(monc, msg);
+		break;
+
 	case CEPH_MSG_MDS_MAP:
 		ceph_mdsc_handle_map(&monc->client->mdsc, msg);
 		break;
@@ -682,12 +815,12 @@
 	int front = le32_to_cpu(hdr->front_len);
 
 	switch (type) {
-	case CEPH_MSG_CLIENT_MOUNT_ACK:
-		return ceph_msgpool_get(&monc->msgpool_mount_ack, front);
 	case CEPH_MSG_MON_SUBSCRIBE_ACK:
 		return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front);
 	case CEPH_MSG_STATFS_REPLY:
 		return ceph_msgpool_get(&monc->msgpool_statfs_reply, front);
+	case CEPH_MSG_AUTH_REPLY:
+		return ceph_msgpool_get(&monc->msgpool_auth_reply, front);
 	}
 	return ceph_alloc_msg(con, hdr);
 }
@@ -717,10 +850,7 @@
 	if (!monc->hunting) {
 		/* start hunting */
 		monc->hunting = true;
-		if (__open_session(monc) == 0) {
-			__send_subscribe(monc);
-			__resend_statfs(monc);
-		}
+		__open_session(monc);
 	} else {
 		/* already hunting, let's wait a bit */
 		__schedule_delayed(monc);
diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h
index 9f6db45..c75b533 100644
--- a/fs/ceph/mon_client.h
+++ b/fs/ceph/mon_client.h
@@ -9,6 +9,7 @@
 
 struct ceph_client;
 struct ceph_mount_args;
+struct ceph_auth_client;
 
 /*
  * The monitor map enumerates the set of all monitors.
@@ -58,23 +59,26 @@
 	struct mutex mutex;
 	struct delayed_work delayed_work;
 
+	struct ceph_auth_client *auth;
+	struct ceph_msg *m_auth;
+
 	bool hunting;
 	int cur_mon;                       /* last monitor i contacted */
 	unsigned long sub_sent, sub_renew_after;
 	struct ceph_connection *con;
+	bool have_fsid;
 
 	/* msg pools */
-	struct ceph_msgpool msgpool_mount_ack;
 	struct ceph_msgpool msgpool_subscribe_ack;
 	struct ceph_msgpool msgpool_statfs_reply;
+	struct ceph_msgpool msgpool_auth_reply;
 
 	/* pending statfs requests */
 	struct radix_tree_root statfs_request_tree;
 	int num_statfs_requests;
 	u64 last_tid;
 
-	/* mds/osd map or mount requests */
-	bool want_mount;
+	/* mds/osd map */
 	int want_next_osdmap; /* 1 = want, 2 = want+asked */
 	u32 have_osdmap, have_mdsmap;
 
@@ -101,11 +105,11 @@
 
 extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
 
-extern int ceph_monc_request_mount(struct ceph_mon_client *monc);
-
 extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
 			       struct ceph_statfs *buf);
 
+extern int ceph_monc_open_session(struct ceph_mon_client *monc);
+
 
 
 #endif
diff --git a/fs/ceph/msgr.h b/fs/ceph/msgr.h
index 8e3ea2e..c758e8f 100644
--- a/fs/ceph/msgr.h
+++ b/fs/ceph/msgr.h
@@ -21,7 +21,7 @@
  * whenever the wire protocol changes.  try to keep this string length
  * constant.
  */
-#define CEPH_BANNER "ceph v023"
+#define CEPH_BANNER "ceph v024"
 #define CEPH_BANNER_MAX_LEN 30
 
 
@@ -46,11 +46,16 @@
 	__le64 num;
 } __attribute__ ((packed));
 
-#define CEPH_ENTITY_TYPE_MON    1
-#define CEPH_ENTITY_TYPE_MDS    2
-#define CEPH_ENTITY_TYPE_OSD    3
-#define CEPH_ENTITY_TYPE_CLIENT 4
-#define CEPH_ENTITY_TYPE_ADMIN  5
+#define CEPH_ENTITY_TYPE_MON    0x01
+#define CEPH_ENTITY_TYPE_MDS    0x02
+#define CEPH_ENTITY_TYPE_OSD    0x04
+#define CEPH_ENTITY_TYPE_CLIENT 0x08
+#define CEPH_ENTITY_TYPE_ADMIN  0x10
+#define CEPH_ENTITY_TYPE_AUTH   0x20
+
+#define CEPH_ENTITY_TYPE_ANY    0xFF
+
+extern const char *ceph_entity_type_name(int type);
 
 /*
  * entity_addr -- network address
@@ -94,6 +99,7 @@
 #define CEPH_MSGR_TAG_ACK           8  /* message ack */
 #define CEPH_MSGR_TAG_KEEPALIVE     9  /* just a keepalive byte! */
 #define CEPH_MSGR_TAG_BADPROTOVER  10  /* bad protocol version */
+#define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
 
 
 /*
@@ -104,6 +110,8 @@
 	__le32 global_seq;   /* count connections initiated by this host */
 	__le32 connect_seq;  /* count connections initiated in this session */
 	__le32 protocol_version;
+	__le32 authorizer_protocol;
+	__le32 authorizer_len;
 	__u8  flags;         /* CEPH_MSG_CONNECT_* */
 } __attribute__ ((packed));
 
@@ -112,6 +120,7 @@
 	__le32 global_seq;
 	__le32 connect_seq;
 	__le32 protocol_version;
+	__le32 authorizer_len;
 	__u8 flags;
 } __attribute__ ((packed));
 
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 0a16c4f..ca0ee68 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -11,6 +11,7 @@
 #include "osd_client.h"
 #include "messenger.h"
 #include "decode.h"
+#include "auth.h"
 
 const static struct ceph_connection_operations osd_con_ops;
 
@@ -331,6 +332,7 @@
 	osd->o_con.private = osd;
 	osd->o_con.ops = &osd_con_ops;
 	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+
 	return osd;
 }
 
@@ -880,9 +882,15 @@
 	/* verify fsid */
 	ceph_decode_need(&p, end, sizeof(fsid), bad);
 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
-	if (ceph_fsid_compare(&fsid, &osdc->client->monc.monmap->fsid)) {
-		pr_err("got osdmap with wrong fsid, ignoring\n");
-		return;
+        if (osdc->client->monc.have_fsid) {
+		if (ceph_fsid_compare(&fsid,
+			              &osdc->client->monc.monmap->fsid)) {
+			pr_err("got osdmap with wrong fsid, ignoring\n");
+			return;
+		}
+	} else {
+		ceph_fsid_set(&osdc->client->monc.monmap->fsid, &fsid);
+		osdc->client->monc.have_fsid = true;
 	}
 
 	down_write(&osdc->map_sem);
@@ -1302,10 +1310,59 @@
 	put_osd(osd);
 }
 
+/*
+ * authentication
+ */
+static int get_authorizer(struct ceph_connection *con,
+                          void **buf, int *len, int *proto,
+                          void **reply_buf, int *reply_len, int force_new)
+{
+	struct ceph_osd *o = con->private;
+	struct ceph_osd_client *osdc = o->o_osdc;
+	struct ceph_auth_client *ac = osdc->client->monc.auth;
+	int ret = 0;
+
+	if (force_new && o->o_authorizer) {
+		ac->ops->destroy_authorizer(ac, o->o_authorizer);
+		o->o_authorizer = NULL;
+	}
+	if (o->o_authorizer == NULL) {
+		ret = ac->ops->create_authorizer(
+			ac, CEPH_ENTITY_TYPE_OSD,
+			&o->o_authorizer,
+			&o->o_authorizer_buf,
+			&o->o_authorizer_buf_len,
+			&o->o_authorizer_reply_buf,
+			&o->o_authorizer_reply_buf_len);
+		if (ret)
+		return ret;
+	}
+
+	*proto = ac->protocol;
+	*buf = o->o_authorizer_buf;
+	*len = o->o_authorizer_buf_len;
+	*reply_buf = o->o_authorizer_reply_buf;
+	*reply_len = o->o_authorizer_reply_buf_len;
+	return 0;
+}
+
+
+static int verify_authorizer_reply(struct ceph_connection *con, int len)
+{
+	struct ceph_osd *o = con->private;
+	struct ceph_osd_client *osdc = o->o_osdc;
+	struct ceph_auth_client *ac = osdc->client->monc.auth;
+
+	return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
+}
+
+
 const static struct ceph_connection_operations osd_con_ops = {
 	.get = get_osd_con,
 	.put = put_osd_con,
 	.dispatch = dispatch,
+	.get_authorizer = get_authorizer,
+	.verify_authorizer_reply = verify_authorizer_reply,
 	.alloc_msg = alloc_msg,
 	.fault = osd_reset,
 	.alloc_middle = ceph_alloc_middle,
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 766c8dc..3d4ae65 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -13,6 +13,7 @@
 struct ceph_snap_context;
 struct ceph_osd_request;
 struct ceph_osd_client;
+struct ceph_authorizer;
 
 /*
  * completion callback for async writepages
@@ -29,6 +30,9 @@
 	struct rb_node o_node;
 	struct ceph_connection o_con;
 	struct list_head o_requests;
+	struct ceph_authorizer *o_authorizer;
+	void *o_authorizer_buf, *o_authorizer_reply_buf;
+	size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
 };
 
 /* an in-flight request */
diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h
index fb23ff9..12bfb2f 100644
--- a/fs/ceph/rados.h
+++ b/fs/ceph/rados.h
@@ -157,7 +157,6 @@
 #define CEPH_OSD_OP_MODE_WR    0x2000
 #define CEPH_OSD_OP_MODE_RMW   0x3000
 #define CEPH_OSD_OP_MODE_SUB   0x4000
-#define CEPH_OSD_OP_MODE_EXEC  0x8000
 
 #define CEPH_OSD_OP_TYPE       0x0f00
 #define CEPH_OSD_OP_TYPE_LOCK  0x0100
@@ -285,6 +284,7 @@
 	CEPH_OSD_FLAG_BALANCE_READS = 256,
 	CEPH_OSD_FLAG_PARALLELEXEC = 512, /* execute op in parallel */
 	CEPH_OSD_FLAG_PGOP = 1024,      /* pg op, no object */
+	CEPH_OSD_FLAG_EXEC = 2048,      /* op may exec */
 };
 
 enum {
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index fe0a596..c901395 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -128,6 +128,8 @@
 		seq_puts(m, ",noasyncreaddir");
 	if (strcmp(args->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
 		seq_printf(m, ",snapdirname=%s", args->snapdir_name);
+	if (args->name)
+		seq_printf(m, ",name=%s", args->name);
 	if (args->secret)
 		seq_puts(m, ",secret=<hidden>");
 	return 0;
@@ -224,12 +226,12 @@
 	switch (type) {
 	case CEPH_MSG_SHUTDOWN: return "shutdown";
 	case CEPH_MSG_PING: return "ping";
+	case CEPH_MSG_AUTH: return "auth";
+	case CEPH_MSG_AUTH_REPLY: return "auth_reply";
 	case CEPH_MSG_MON_MAP: return "mon_map";
 	case CEPH_MSG_MON_GET_MAP: return "mon_get_map";
 	case CEPH_MSG_MON_SUBSCRIBE: return "mon_subscribe";
 	case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";
-	case CEPH_MSG_CLIENT_MOUNT: return "client_mount";
-	case CEPH_MSG_CLIENT_MOUNT_ACK: return "client_mount_ack";
 	case CEPH_MSG_STATFS: return "statfs";
 	case CEPH_MSG_STATFS_REPLY: return "statfs_reply";
 	case CEPH_MSG_MDS_MAP: return "mds_map";
@@ -267,6 +269,7 @@
 	Opt_last_int,
 	/* int args above */
 	Opt_snapdirname,
+	Opt_name,
 	Opt_secret,
 	Opt_last_string,
 	/* string args above */
@@ -293,6 +296,7 @@
 	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
 	/* int args above */
 	{Opt_snapdirname, "snapdirname=%s"},
+	{Opt_name, "name=%s"},
 	{Opt_secret, "secret=%s"},
 	/* string args above */
 	{Opt_ip, "ip=%s"},
@@ -407,6 +411,11 @@
 					      argstr[0].to-argstr[0].from,
 					      GFP_KERNEL);
 			break;
+		case Opt_name:
+			args->name = kstrndup(argstr[0].from,
+					      argstr[0].to-argstr[0].from,
+					      GFP_KERNEL);
+			break;
 		case Opt_secret:
 			args->secret = kstrndup(argstr[0].from,
 						argstr[0].to-argstr[0].from,
@@ -476,6 +485,8 @@
 	dout("destroy_mount_args %p\n", args);
 	kfree(args->snapdir_name);
 	args->snapdir_name = NULL;
+	kfree(args->name);
+	args->name = NULL;
 	kfree(args->secret);
 	args->secret = NULL;
 	kfree(args);
@@ -657,27 +668,23 @@
 		client->msgr->nocrc = ceph_test_opt(client, NOCRC);
 	}
 
-	/* send mount request, and wait for mon, mds, and osd maps */
-	err = ceph_monc_request_mount(&client->monc);
+	/* open session, and wait for mon, mds, and osd maps */
+	err = ceph_monc_open_session(&client->monc);
 	if (err < 0)
 		goto out;
 
-	while (!have_mon_map(client) && !client->mount_err) {
+	while (!have_mon_map(client)) {
 		err = -EIO;
 		if (timeout && time_after_eq(jiffies, started + timeout))
 			goto out;
 
 		/* wait */
-		dout("mount waiting for mount\n");
-		err = wait_event_interruptible_timeout(client->mount_wq,
-			       client->mount_err || have_mon_map(client),
+		dout("mount waiting for mon_map\n");
+		err = wait_event_interruptible_timeout(client->mount_wq, /* FIXME */
+			       have_mon_map(client),
 			       timeout);
 		if (err == -EINTR || err == -ERESTARTSYS)
 			goto out;
-		if (client->mount_err) {
-			err = client->mount_err;
-			goto out;
-		}
 	}
 
 	dout("mount opening root\n");
@@ -795,7 +802,6 @@
 		client->backing_dev_info.ra_pages =
 			(client->mount_args->rsize + PAGE_CACHE_SIZE - 1)
 			>> PAGE_SHIFT;
-
 	err = bdi_register_dev(&client->backing_dev_info, sb->s_dev);
 	return err;
 }
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 8aa1ffb..e0e8130 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -61,6 +61,7 @@
 	int max_readdir;      /* max readdir size */
 	int osd_timeout;
 	char *snapdir_name;   /* default ".snap" */
+	char *name;
 	char *secret;
 	int cap_release_safety;
 };
@@ -75,6 +76,7 @@
 #define CEPH_MSG_MAX_DATA_LEN	(16*1024*1024)
 
 #define CEPH_SNAPDIRNAME_DEFAULT ".snap"
+#define CEPH_AUTH_NAME_DEFAULT   "guest"
 
 /*
  * Delay telling the MDS we no longer want caps, in case we reopen