orangefs: have ..._clean_interrupted_...() wait for copy to/from daemon

* turn all those list_del(&op->list) into list_del_init()
* don't pick ops that are already given up in control device
  ->read()/->write_iter().
* have orangefs_clean_interrupted_operation() notice if op is currently
  being copied to/from daemon (by said ->read()/->write_iter()) and
  wait for that to finish.
* when we are done copying to/from daemon and find that it had been
  given up while we were doing that, wake the waiting ..._clean_interrupted_...

As the result, we are guaranteed that orangefs_clean_interrupted_operation(op)
doesn't return until nobody else can see op.  Moreover, we don't need to play
with op refcounts anymore.

Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
diff --git a/fs/orangefs/devorangefs-req.c b/fs/orangefs/devorangefs-req.c
index 89c282a..f7914f5 100644
--- a/fs/orangefs/devorangefs-req.c
+++ b/fs/orangefs/devorangefs-req.c
@@ -58,9 +58,9 @@
 				 next,
 				 &htable_ops_in_progress[index],
 				 list) {
-		if (op->tag == tag && !op_state_purged(op)) {
+		if (op->tag == tag && !op_state_purged(op) &&
+		    !op_state_given_up(op)) {
 			list_del_init(&op->list);
-			get_op(op); /* increase ref count. */
 			spin_unlock(&htable_ops_in_progress_lock);
 			return op;
 		}
@@ -133,7 +133,7 @@
 		__s32 fsid;
 		/* This lock is held past the end of the loop when we break. */
 		spin_lock(&op->lock);
-		if (unlikely(op_state_purged(op))) {
+		if (unlikely(op_state_purged(op) || op_state_given_up(op))) {
 			spin_unlock(&op->lock);
 			continue;
 		}
@@ -199,13 +199,12 @@
 	 */
 	if (op_state_in_progress(cur_op) || op_state_serviced(cur_op)) {
 		gossip_err("orangefs: ERROR: Current op already queued.\n");
-		list_del(&cur_op->list);
+		list_del_init(&cur_op->list);
 		spin_unlock(&cur_op->lock);
 		spin_unlock(&orangefs_request_list_lock);
 		return -EAGAIN;
 	}
 	list_del_init(&cur_op->list);
-	get_op(op);
 	spin_unlock(&orangefs_request_list_lock);
 
 	spin_unlock(&cur_op->lock);
@@ -230,7 +229,7 @@
 	if (unlikely(op_state_given_up(cur_op))) {
 		spin_unlock(&cur_op->lock);
 		spin_unlock(&htable_ops_in_progress_lock);
-		op_release(cur_op);
+		complete(&cur_op->waitq);
 		goto restart;
 	}
 
@@ -242,7 +241,6 @@
 	orangefs_devreq_add_op(cur_op);
 	spin_unlock(&cur_op->lock);
 	spin_unlock(&htable_ops_in_progress_lock);
-	op_release(cur_op);
 
 	/* The client only asks to read one size buffer. */
 	return MAX_DEV_REQ_UPSIZE;
@@ -258,10 +256,12 @@
 	if (likely(!op_state_given_up(cur_op))) {
 		set_op_state_waiting(cur_op);
 		list_add(&cur_op->list, &orangefs_request_list);
+		spin_unlock(&cur_op->lock);
+	} else {
+		spin_unlock(&cur_op->lock);
+		complete(&cur_op->waitq);
 	}
-	spin_unlock(&cur_op->lock);
 	spin_unlock(&orangefs_request_list_lock);
-	op_release(cur_op);
 	return -EFAULT;
 }
 
@@ -405,11 +405,11 @@
 		put_cancel(op);
 	} else if (unlikely(op_state_given_up(op))) {
 		spin_unlock(&op->lock);
+		complete(&op->waitq);
 	} else {
 		set_op_state_serviced(op);
 		spin_unlock(&op->lock);
 	}
-	op_release(op);
 	return ret;
 
 Efault: