ceph: add imported caps when handling cap export message

Version 3 cap export message includes information about the imported
caps. It allows us to add the imported caps if the corresponding cap
import message still hasn't been received.

This allow us to handle situation that the importer MDS crashes and
the cap import message is missing.

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 98f3ca4..1754338 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -555,21 +555,34 @@
 		cap->ci = ci;
 		__insert_cap_node(ci, cap);
 
-		/* clear out old exporting info?  (i.e. on cap import) */
-		if (ci->i_cap_exporting_mds == mds) {
-			ci->i_cap_exporting_issued = 0;
-			ci->i_cap_exporting_mseq = 0;
-			ci->i_cap_exporting_mds = -1;
-		}
-
 		/* add to session cap list */
 		cap->session = session;
 		spin_lock(&session->s_cap_lock);
 		list_add_tail(&cap->session_caps, &session->s_caps);
 		session->s_nr_caps++;
 		spin_unlock(&session->s_cap_lock);
-	} else if (new_cap)
-		ceph_put_cap(mdsc, new_cap);
+	} else {
+		if (new_cap)
+			ceph_put_cap(mdsc, new_cap);
+
+		/*
+		 * auth mds of the inode changed. we received the cap export
+		 * message, but still haven't received the cap import message.
+		 * handle_cap_export() updated the new auth MDS' cap.
+		 *
+		 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
+		 * a message that was send before the cap import message. So
+		 * don't remove caps.
+		 */
+		if (ceph_seq_cmp(seq, cap->seq) <= 0) {
+			WARN_ON(cap != ci->i_auth_cap);
+			WARN_ON(cap->cap_id != cap_id);
+			seq = cap->seq;
+			mseq = cap->mseq;
+			issued |= cap->issued;
+			flags |= CEPH_CAP_FLAG_AUTH;
+		}
+	}
 
 	if (!ci->i_snap_realm) {
 		/*
@@ -612,15 +625,8 @@
 		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
 			ci->i_auth_cap = cap;
 		ci->i_cap_exporting_issued = 0;
-	} else if (ci->i_auth_cap == cap) {
-		ci->i_auth_cap = NULL;
-		spin_lock(&mdsc->cap_dirty_lock);
-		if (!list_empty(&ci->i_dirty_item)) {
-			dout(" moving %p to cap_dirty_migrating\n", inode);
-			list_move(&ci->i_dirty_item,
-				  &mdsc->cap_dirty_migrating);
-		}
-		spin_unlock(&mdsc->cap_dirty_lock);
+	} else {
+		WARN_ON(ci->i_auth_cap == cap);
 	}
 
 	dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
@@ -889,7 +895,7 @@
  */
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
-	return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_mds >= 0;
+	return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
 }
 
 int ceph_is_any_caps(struct inode *inode)
@@ -1396,13 +1402,10 @@
 				ci->i_snap_realm->cached_context);
 		dout(" inode %p now dirty snapc %p auth cap %p\n",
 		     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
+		WARN_ON(!ci->i_auth_cap);
 		BUG_ON(!list_empty(&ci->i_dirty_item));
 		spin_lock(&mdsc->cap_dirty_lock);
-		if (ci->i_auth_cap)
-			list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
-		else
-			list_add(&ci->i_dirty_item,
-				 &mdsc->cap_dirty_migrating);
+		list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
 		spin_unlock(&mdsc->cap_dirty_lock);
 		if (ci->i_flushing_caps == 0) {
 			ihold(inode);
@@ -2421,6 +2424,22 @@
 	dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
 		inode->i_size);
 
+
+	/*
+	 * auth mds of the inode changed. we received the cap export message,
+	 * but still haven't received the cap import message. handle_cap_export
+	 * updated the new auth MDS' cap.
+	 *
+	 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
+	 * that was sent before the cap import message. So don't remove caps.
+	 */
+	if (ceph_seq_cmp(seq, cap->seq) <= 0) {
+		WARN_ON(cap != ci->i_auth_cap);
+		WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
+		seq = cap->seq;
+		newcaps |= cap->issued;
+	}
+
 	/*
 	 * If CACHE is being revoked, and we have no dirty buffers,
 	 * try to invalidate (once).  (If there are dirty buffers, we
@@ -2447,6 +2466,7 @@
 	issued |= implemented | __ceph_caps_dirty(ci);
 
 	cap->cap_gen = session->s_cap_gen;
+	cap->seq = seq;
 
 	__check_cap_issue(ci, cap, newcaps);
 
@@ -2497,6 +2517,10 @@
 			    le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
 			    &atime);
 
+
+	/* file layout may have changed */
+	ci->i_layout = grant->layout;
+
 	/* max size increase? */
 	if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
 		dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
@@ -2525,11 +2549,6 @@
 			check_caps = 1;
 	}
 
-	cap->seq = seq;
-
-	/* file layout may have changed */
-	ci->i_layout = grant->layout;
-
 	/* revocation, grant, or no-op? */
 	if (cap->issued & ~newcaps) {
 		int revoking = cap->issued & ~newcaps;
@@ -2755,65 +2774,114 @@
  * caller holds s_mutex
  */
 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
-			      struct ceph_mds_session *session,
-			      int *open_target_sessions)
+			      struct ceph_mds_cap_peer *ph,
+			      struct ceph_mds_session *session)
 {
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+	struct ceph_mds_session *tsession = NULL;
+	struct ceph_cap *cap, *tcap;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int mds = session->s_mds;
+	u64 t_cap_id;
 	unsigned mseq = le32_to_cpu(ex->migrate_seq);
-	struct ceph_cap *cap = NULL, *t;
-	struct rb_node *p;
-	int remember = 1;
+	unsigned t_seq, t_mseq;
+	int target, issued;
+	int mds = session->s_mds;
 
-	dout("handle_cap_export inode %p ci %p mds%d mseq %d\n",
-	     inode, ci, mds, mseq);
-
-	spin_lock(&ci->i_ceph_lock);
-
-	/* make sure we haven't seen a higher mseq */
-	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
-		t = rb_entry(p, struct ceph_cap, ci_node);
-		if (ceph_seq_cmp(t->mseq, mseq) > 0) {
-			dout(" higher mseq on cap from mds%d\n",
-			     t->session->s_mds);
-			remember = 0;
-		}
-		if (t->session->s_mds == mds)
-			cap = t;
+	if (ph) {
+		t_cap_id = le64_to_cpu(ph->cap_id);
+		t_seq = le32_to_cpu(ph->seq);
+		t_mseq = le32_to_cpu(ph->mseq);
+		target = le32_to_cpu(ph->mds);
+	} else {
+		t_cap_id = t_seq = t_mseq = 0;
+		target = -1;
 	}
 
-	if (cap) {
-		if (remember) {
-			/* make note */
-			ci->i_cap_exporting_mds = mds;
-			ci->i_cap_exporting_mseq = mseq;
-			ci->i_cap_exporting_issued = cap->issued;
+	dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n",
+	     inode, ci, mds, mseq, target);
+retry:
+	spin_lock(&ci->i_ceph_lock);
+	cap = __get_cap_for_mds(ci, mds);
+	if (!cap)
+		goto out_unlock;
 
-			/*
-			 * make sure we have open sessions with all possible
-			 * export targets, so that we get the matching IMPORT
-			 */
-			*open_target_sessions = 1;
+	if (target < 0) {
+		__ceph_remove_cap(cap, false);
+		goto out_unlock;
+	}
 
-			/*
-			 * we can't flush dirty caps that we've seen the
-			 * EXPORT but no IMPORT for
-			 */
-			spin_lock(&mdsc->cap_dirty_lock);
-			if (!list_empty(&ci->i_dirty_item)) {
-				dout(" moving %p to cap_dirty_migrating\n",
-				     inode);
-				list_move(&ci->i_dirty_item,
-					  &mdsc->cap_dirty_migrating);
+	/*
+	 * now we know we haven't received the cap import message yet
+	 * because the exported cap still exist.
+	 */
+
+	issued = cap->issued;
+	WARN_ON(issued != cap->implemented);
+
+	tcap = __get_cap_for_mds(ci, target);
+	if (tcap) {
+		/* already have caps from the target */
+		if (tcap->cap_id != t_cap_id ||
+		    ceph_seq_cmp(tcap->seq, t_seq) < 0) {
+			dout(" updating import cap %p mds%d\n", tcap, target);
+			tcap->cap_id = t_cap_id;
+			tcap->seq = t_seq - 1;
+			tcap->issue_seq = t_seq - 1;
+			tcap->mseq = t_mseq;
+			tcap->issued |= issued;
+			tcap->implemented |= issued;
+			if (cap == ci->i_auth_cap)
+				ci->i_auth_cap = tcap;
+			if (ci->i_flushing_caps && ci->i_auth_cap == tcap) {
+				spin_lock(&mdsc->cap_dirty_lock);
+				list_move_tail(&ci->i_flushing_item,
+					       &tcap->session->s_cap_flushing);
+				spin_unlock(&mdsc->cap_dirty_lock);
 			}
-			spin_unlock(&mdsc->cap_dirty_lock);
 		}
 		__ceph_remove_cap(cap, false);
+		goto out_unlock;
 	}
-	/* else, we already released it */
+
+	if (tsession) {
+		int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
+		spin_unlock(&ci->i_ceph_lock);
+		/* add placeholder for the export tagert */
+		ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
+			     t_seq - 1, t_mseq, (u64)-1, flag, NULL);
+		goto retry;
+	}
 
 	spin_unlock(&ci->i_ceph_lock);
+	mutex_unlock(&session->s_mutex);
+
+	/* open target session */
+	tsession = ceph_mdsc_open_export_target_session(mdsc, target);
+	if (!IS_ERR(tsession)) {
+		if (mds > target) {
+			mutex_lock(&session->s_mutex);
+			mutex_lock_nested(&tsession->s_mutex,
+					  SINGLE_DEPTH_NESTING);
+		} else {
+			mutex_lock(&tsession->s_mutex);
+			mutex_lock_nested(&session->s_mutex,
+					  SINGLE_DEPTH_NESTING);
+		}
+		ceph_add_cap_releases(mdsc, tsession);
+	} else {
+		WARN_ON(1);
+		tsession = NULL;
+		target = -1;
+	}
+	goto retry;
+
+out_unlock:
+	spin_unlock(&ci->i_ceph_lock);
+	mutex_unlock(&session->s_mutex);
+	if (tsession) {
+		mutex_unlock(&tsession->s_mutex);
+		ceph_put_mds_session(tsession);
+	}
 }
 
 /*
@@ -2915,7 +2983,6 @@
 	void *flock;
 	void *end;
 	u32 flock_len;
-	int open_target_sessions = 0;
 
 	dout("handle_caps from mds%d\n", mds);
 
@@ -2954,6 +3021,9 @@
 			if (p + sizeof(*peer) > end)
 				goto bad;
 			peer = p;
+		} else if (op == CEPH_CAP_OP_EXPORT) {
+			/* recorded in unused fields */
+			peer = (void *)&h->size;
 		}
 	}
 
@@ -2989,8 +3059,8 @@
 		goto done;
 
 	case CEPH_CAP_OP_EXPORT:
-		handle_cap_export(inode, h, session, &open_target_sessions);
-		goto done;
+		handle_cap_export(inode, h, peer, session);
+		goto done_unlocked;
 
 	case CEPH_CAP_OP_IMPORT:
 		handle_cap_import(mdsc, inode, h, peer, session,
@@ -3045,8 +3115,6 @@
 done_unlocked:
 	if (inode)
 		iput(inode);
-	if (open_target_sessions)
-		ceph_mdsc_open_export_target_sessions(mdsc, session);
 	return;
 
 bad: