orangefs: hopefully saner op refcounting and locking

* create with refcount 1
* make op_release() decrement and free if zero (i.e. old put_op()
  has become that).
* mark when submitter has given up waiting; from that point nobody
  else can move between the lists, change state, etc.
* have daemon read/write_iter grab a reference when picking op
  and *always* give it up in the end
* don't put into hash until we know it's been successfully passed to
  daemon

* move op->lock _lower_ than htab_in_progress_lock (and make sure
  to take it in purge_inprogress_ops())

Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
diff --git a/fs/orangefs/orangefs-kernel.h b/fs/orangefs/orangefs-kernel.h
index 4219b2f..f96ec3d 100644
--- a/fs/orangefs/orangefs-kernel.h
+++ b/fs/orangefs/orangefs-kernel.h
@@ -94,6 +94,7 @@
  * serviced - op has matching downcall; ok
  * purged   - op has to start a timer since client-core
  *            exited uncleanly before servicing op
+ * given up - submitter has given up waiting for it
  */
 enum orangefs_vfs_op_states {
 	OP_VFS_STATE_UNKNOWN = 0,
@@ -101,30 +102,9 @@
 	OP_VFS_STATE_INPROGR = 2,
 	OP_VFS_STATE_SERVICED = 4,
 	OP_VFS_STATE_PURGED = 8,
+	OP_VFS_STATE_GIVEN_UP = 16,
 };
 
-#define get_op(op)					\
-	do {						\
-		atomic_inc(&(op)->ref_count);	\
-		gossip_debug(GOSSIP_DEV_DEBUG,	\
-			"(get) Alloced OP (%p:%llu)\n",	\
-			op,				\
-			llu((op)->tag));		\
-	} while (0)
-
-#define put_op(op)							\
-	do {								\
-		if (atomic_sub_and_test(1, &(op)->ref_count) == 1) {  \
-			gossip_debug(GOSSIP_DEV_DEBUG,		\
-				"(put) Releasing OP (%p:%llu)\n",	\
-				op,					\
-				llu((op)->tag));			\
-			op_release(op);					\
-			}						\
-	} while (0)
-
-#define op_wait(op) (atomic_read(&(op)->ref_count) <= 2 ? 0 : 1)
-
 /*
  * Defines for controlling whether I/O upcalls are for async or sync operations
  */
@@ -258,6 +238,25 @@
 #define op_state_in_progress(op) ((op)->op_state & OP_VFS_STATE_INPROGR)
 #define op_state_serviced(op)    ((op)->op_state & OP_VFS_STATE_SERVICED)
 #define op_state_purged(op)      ((op)->op_state & OP_VFS_STATE_PURGED)
+#define op_state_given_up(op)    ((op)->op_state & OP_VFS_STATE_GIVEN_UP)
+
+static inline void get_op(struct orangefs_kernel_op_s *op)
+{
+	atomic_inc(&op->ref_count);
+	gossip_debug(GOSSIP_DEV_DEBUG,
+			"(get) Alloced OP (%p:%llu)\n",	op, llu(op->tag));
+}
+
+void __op_release(struct orangefs_kernel_op_s *op);
+
+static inline void op_release(struct orangefs_kernel_op_s *op)
+{
+	if (atomic_dec_and_test(&op->ref_count)) {
+		gossip_debug(GOSSIP_DEV_DEBUG,
+			"(put) Releasing OP (%p:%llu)\n", op, llu((op)->tag));
+		__op_release(op);
+	}
+}
 
 /* per inode private orangefs info */
 struct orangefs_inode_s {
@@ -459,7 +458,6 @@
 int op_cache_finalize(void);
 struct orangefs_kernel_op_s *op_alloc(__s32 type);
 char *get_opname_string(struct orangefs_kernel_op_s *new_op);
-void op_release(struct orangefs_kernel_op_s *op);
 
 int dev_req_cache_initialize(void);
 int dev_req_cache_finalize(void);
@@ -665,11 +663,9 @@
 do {								\
 	if (!op_state_serviced(new_op)) {			\
 		orangefs_cancel_op_in_progress(new_op->tag);	\
-		op_release(new_op);				\
 	} else {						\
 		wake_up_daemon_for_return(new_op);		\
 	}							\
-	new_op = NULL;						\
 	orangefs_bufmap_put(bufmap, buffer_index);				\
 	buffer_index = -1;					\
 } while (0)