tipc: Fix race condition when creating socket or native port

This patch eliminates the (very remote) chance of a crash resulting
from a partially initialized socket or native port unexpectedly
receiving a message.  Now, during the creation of a socket or native
port, the underlying generic port's lock is not released until all
initialization required to handle incoming messages has been done.

Signed-off-by: Allan Stephens <allan.stephens@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/include/net/tipc/tipc_port.h b/include/net/tipc/tipc_port.h
index 11105bc..9923e41 100644
--- a/include/net/tipc/tipc_port.h
+++ b/include/net/tipc/tipc_port.h
@@ -84,7 +84,8 @@
 u32 tipc_createport_raw(void *usr_handle,
 			u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
 			void (*wakeup)(struct tipc_port *),
-			const u32 importance);
+			const u32 importance,
+			struct tipc_port **tp_ptr);
 
 int tipc_reject_msg(struct sk_buff *buf, u32 err);
 
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 2f58064..757de38 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -211,15 +211,18 @@
 }
 
 /**
- * tipc_createport_raw - create a native TIPC port
+ * tipc_createport_raw - create a generic TIPC port
  *
- * Returns local port reference
+ * Returns port reference, or 0 if unable to create it
+ *
+ * Note: The newly created port is returned in the locked state.
  */
 
 u32 tipc_createport_raw(void *usr_handle,
 			u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
 			void (*wakeup)(struct tipc_port *),
-			const u32 importance)
+			const u32 importance,
+			struct tipc_port **tp_ptr)
 {
 	struct port *p_ptr;
 	struct tipc_msg *msg;
@@ -237,7 +240,6 @@
 		return 0;
 	}
 
-	tipc_port_lock(ref);
 	p_ptr->publ.usr_handle = usr_handle;
 	p_ptr->publ.max_pkt = MAX_PKT_DEFAULT;
 	p_ptr->publ.ref = ref;
@@ -262,7 +264,7 @@
 	INIT_LIST_HEAD(&p_ptr->port_list);
 	list_add_tail(&p_ptr->port_list, &ports);
 	spin_unlock_bh(&tipc_port_list_lock);
-	tipc_port_unlock(p_ptr);
+	*tp_ptr = &p_ptr->publ;
 	return ref;
 }
 
@@ -1053,6 +1055,7 @@
 {
 	struct user_port *up_ptr;
 	struct port *p_ptr;
+	struct tipc_port *tp_ptr;
 	u32 ref;
 
 	up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
@@ -1060,12 +1063,13 @@
 		warn("Port creation failed, no memory\n");
 		return -ENOMEM;
 	}
-	ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance);
-	p_ptr = tipc_port_lock(ref);
-	if (!p_ptr) {
+	ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup,
+				  importance, &tp_ptr);
+	if (ref == 0) {
 		kfree(up_ptr);
 		return -ENOMEM;
 	}
+	p_ptr = (struct port *)tp_ptr;
 
 	p_ptr->user_port = up_ptr;
 	up_ptr->user_ref = user_ref;
diff --git a/net/tipc/ref.c b/net/tipc/ref.c
index 89cbab2..a101de8 100644
--- a/net/tipc/ref.c
+++ b/net/tipc/ref.c
@@ -142,9 +142,13 @@
 /**
  * tipc_ref_acquire - create reference to an object
  *
- * Return a unique reference value which can be translated back to the pointer
- * 'object' at a later time.  Also, pass back a pointer to the lock protecting
- * the object, but without locking it.
+ * Register an object pointer in reference table and lock the object.
+ * Returns a unique reference value that is used from then on to retrieve the
+ * object pointer, or to determine that the object has been deregistered.
+ *
+ * Note: The object is returned in the locked state so that the caller can
+ * register a partially initialized object, without running the risk that
+ * the object will be accessed before initialization is complete.
  */
 
 u32 tipc_ref_acquire(void *object, spinlock_t **lock)
@@ -178,13 +182,13 @@
 		ref = (next_plus_upper & ~index_mask) + index;
 		entry->ref = ref;
 		entry->object = object;
-		spin_unlock_bh(&entry->lock);
 		*lock = &entry->lock;
 	}
 	else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {
 		index = tipc_ref_table.init_point++;
 		entry = &(tipc_ref_table.entries[index]);
 		spin_lock_init(&entry->lock);
+		spin_lock_bh(&entry->lock);
 		ref = tipc_ref_table.start_mask + index;
 		entry->ref = ref;
 		entry->object = object;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 230f9ca..38f4879 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -188,6 +188,7 @@
 	const struct proto_ops *ops;
 	socket_state state;
 	struct sock *sk;
+	struct tipc_port *tp_ptr;
 	u32 portref;
 
 	/* Validate arguments */
@@ -225,7 +226,7 @@
 	/* Allocate TIPC port for socket to use */
 
 	portref = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
-				      TIPC_LOW_IMPORTANCE);
+				      TIPC_LOW_IMPORTANCE, &tp_ptr);
 	if (unlikely(portref == 0)) {
 		sk_free(sk);
 		return -ENOMEM;
@@ -241,6 +242,8 @@
 	sk->sk_backlog_rcv = backlog_rcv;
 	tipc_sk(sk)->p = tipc_get_port(portref);
 
+	spin_unlock_bh(tp_ptr->lock);
+
 	if (sock->state == SS_READY) {
 		tipc_set_portunreturnable(portref, 1);
 		if (sock->type == SOCK_DGRAM)
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 8c01ccd..8f8d0a6 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -474,6 +474,7 @@
 		kfree(subscriber);
 		return;
 	}
+	spin_unlock_bh(subscriber->lock);
 
 	/* Establish a connection to subscriber */