tipc: convert topology server to use new server facility

As the new TIPC server infrastructure has been introduced, we can
now convert the TIPC topology server to it.  We get two benefits
from doing this:

1) It simplifies the topology server locking policy.  In the
original locking policy, we placed one spin lock pointer in the
tipc_subscriber structure to reuse the lock of the subscriber's
server port, controlling access to members of tipc_subscriber
instance.  That is, we only used one lock to ensure both
tipc_port and tipc_subscriber members were safely accessed.

Now we introduce another spin lock for tipc_subscriber structure
only protecting themselves, to get a finer granularity locking
policy.  Moreover, the change will allow us to make the topology
server code more readable and maintainable.

2) It fixes a bug where sent subscription events may be lost when
the topology port is congested.  Using the new service, the
topology server now queues sent events into an outgoing buffer,
and then wakes up a sender process which has been blocked in
workqueue context.  The process will keep picking events from the
buffer and send them to their respective subscribers, using the
kernel socket interface, until the buffer is empty. Even if the
socket is congested during transmission there is no risk that
events may be dropped, since the sender process may block when
needed.

Some minor reordering of initialization is done, since we now
have a scenario where the topology server must be started after
socket initialization has taken place, as the former depends
on the latter.  And overall, we see a simplification of the
TIPC subscriber code in making this changeover.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/core.c b/net/tipc/core.c
index b0e42a0..15bbe99 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -2,7 +2,7 @@
  * net/tipc/core.c: TIPC module code
  *
  * Copyright (c) 2003-2006, Ericsson AB
- * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
+ * Copyright (c) 2005-2006, 2010-2013, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -137,8 +137,6 @@
 	if (!res)
 		res = tipc_nametbl_init();
 	if (!res)
-		res = tipc_subscr_start();
-	if (!res)
 		res = tipc_cfg_init();
 	if (!res)
 		res = tipc_netlink_start();
@@ -146,6 +144,8 @@
 		res = tipc_socket_init();
 	if (!res)
 		res = tipc_register_sysctl();
+	if (!res)
+		res = tipc_subscr_start();
 	if (res)
 		tipc_core_stop();
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index bd8e2cd..d025415 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -402,7 +402,8 @@
 	else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
 		return -EAFNOSUPPORT;
 
-	if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
+	if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
+	    (addr->addr.nameseq.type != TIPC_TOP_SRV))
 		return -EACCES;
 
 	return (addr->scope > 0) ?
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 6b42d47..f6be92a 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -2,7 +2,7 @@
  * net/tipc/subscr.c: TIPC network topology service
  *
  * Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005-2007, 2010-2011, Wind River Systems
+ * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -41,33 +41,42 @@
 
 /**
  * struct tipc_subscriber - TIPC network topology subscriber
- * @port_ref: object reference to server port connecting to subscriber
- * @lock: pointer to spinlock controlling access to subscriber's server port
- * @subscriber_list: adjacent subscribers in top. server's list of subscribers
+ * @conid: connection identifier to server connecting to subscriber
+ * @lock: controll access to subscriber
  * @subscription_list: list of subscription objects for this subscriber
  */
 struct tipc_subscriber {
-	u32 port_ref;
-	spinlock_t *lock;
-	struct list_head subscriber_list;
+	int conid;
+	spinlock_t lock;
 	struct list_head subscription_list;
 };
 
-/**
- * struct top_srv - TIPC network topology subscription service
- * @setup_port: reference to TIPC port that handles subscription requests
- * @subscription_count: number of active subscriptions (not subscribers!)
- * @subscriber_list: list of ports subscribing to service
- * @lock: spinlock govering access to subscriber list
- */
-struct top_srv {
-	u32 setup_port;
-	atomic_t subscription_count;
-	struct list_head subscriber_list;
-	spinlock_t lock;
+static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
+				  void *usr_data, void *buf, size_t len);
+static void *subscr_named_msg_event(int conid);
+static void subscr_conn_shutdown_event(int conid, void *usr_data);
+
+static atomic_t subscription_count = ATOMIC_INIT(0);
+
+static struct sockaddr_tipc topsrv_addr __read_mostly = {
+	.family			= AF_TIPC,
+	.addrtype		= TIPC_ADDR_NAMESEQ,
+	.addr.nameseq.type	= TIPC_TOP_SRV,
+	.addr.nameseq.lower	= TIPC_TOP_SRV,
+	.addr.nameseq.upper	= TIPC_TOP_SRV,
+	.scope			= TIPC_NODE_SCOPE
 };
 
-static struct top_srv topsrv;
+static struct tipc_server topsrv __read_mostly = {
+	.saddr			= &topsrv_addr,
+	.imp			= TIPC_CRITICAL_IMPORTANCE,
+	.type			= SOCK_SEQPACKET,
+	.max_rcvbuf_size	= sizeof(struct tipc_subscr),
+	.name			= "topology_server",
+	.tipc_conn_recvmsg	= subscr_conn_msg_event,
+	.tipc_conn_new		= subscr_named_msg_event,
+	.tipc_conn_shutdown	= subscr_conn_shutdown_event,
+};
 
 /**
  * htohl - convert value to endianness used by destination
@@ -81,20 +90,13 @@
 	return swap ? swab32(in) : in;
 }
 
-/**
- * subscr_send_event - send a message containing a tipc_event to the subscriber
- *
- * Note: Must not hold subscriber's server port lock, since tipc_send() will
- *       try to take the lock if the message is rejected and returned!
- */
-static void subscr_send_event(struct tipc_subscription *sub,
-			      u32 found_lower,
-			      u32 found_upper,
-			      u32 event,
-			      u32 port_ref,
+static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
+			      u32 found_upper, u32 event, u32 port_ref,
 			      u32 node)
 {
-	struct iovec msg_sect;
+	struct tipc_subscriber *subscriber = sub->subscriber;
+	struct kvec msg_sect;
+	int ret;
 
 	msg_sect.iov_base = (void *)&sub->evt;
 	msg_sect.iov_len = sizeof(struct tipc_event);
@@ -104,7 +106,10 @@
 	sub->evt.found_upper = htohl(found_upper, sub->swap);
 	sub->evt.port.ref = htohl(port_ref, sub->swap);
 	sub->evt.port.node = htohl(node, sub->swap);
-	tipc_send(sub->server_ref, 1, &msg_sect, msg_sect.iov_len);
+	ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL,
+				msg_sect.iov_base, msg_sect.iov_len);
+	if (ret < 0)
+		pr_err("Sending subscription event failed, no memory\n");
 }
 
 /**
@@ -147,21 +152,24 @@
 	subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
 }
 
-/**
- * subscr_timeout - subscription timeout has occurred
- */
 static void subscr_timeout(struct tipc_subscription *sub)
 {
-	struct tipc_port *server_port;
+	struct tipc_subscriber *subscriber = sub->subscriber;
 
-	/* Validate server port reference (in case subscriber is terminating) */
-	server_port = tipc_port_lock(sub->server_ref);
-	if (server_port == NULL)
+	/* The spin lock per subscriber is used to protect its members */
+	spin_lock_bh(&subscriber->lock);
+
+	/* Validate if the connection related to the subscriber is
+	 * closed (in case subscriber is terminating)
+	 */
+	if (subscriber->conid == 0) {
+		spin_unlock_bh(&subscriber->lock);
 		return;
+	}
 
 	/* Validate timeout (in case subscription is being cancelled) */
 	if (sub->timeout == TIPC_WAIT_FOREVER) {
-		tipc_port_unlock(server_port);
+		spin_unlock_bh(&subscriber->lock);
 		return;
 	}
 
@@ -171,8 +179,7 @@
 	/* Unlink subscription from subscriber */
 	list_del(&sub->subscription_list);
 
-	/* Release subscriber's server port */
-	tipc_port_unlock(server_port);
+	spin_unlock_bh(&subscriber->lock);
 
 	/* Notify subscriber of timeout */
 	subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
@@ -181,64 +188,54 @@
 	/* Now destroy subscription */
 	k_term_timer(&sub->timer);
 	kfree(sub);
-	atomic_dec(&topsrv.subscription_count);
+	atomic_dec(&subscription_count);
 }
 
 /**
  * subscr_del - delete a subscription within a subscription list
  *
- * Called with subscriber port locked.
+ * Called with subscriber lock held.
  */
 static void subscr_del(struct tipc_subscription *sub)
 {
 	tipc_nametbl_unsubscribe(sub);
 	list_del(&sub->subscription_list);
 	kfree(sub);
-	atomic_dec(&topsrv.subscription_count);
+	atomic_dec(&subscription_count);
 }
 
 /**
  * subscr_terminate - terminate communication with a subscriber
  *
- * Called with subscriber port locked.  Routine must temporarily release lock
- * to enable subscription timeout routine(s) to finish without deadlocking;
- * the lock is then reclaimed to allow caller to release it upon return.
- * (This should work even in the unlikely event some other thread creates
- * a new object reference in the interim that uses this lock; this routine will
- * simply wait for it to be released, then claim it.)
+ * Note: Must call it in process context since it might sleep.
  */
 static void subscr_terminate(struct tipc_subscriber *subscriber)
 {
-	u32 port_ref;
+	tipc_conn_terminate(&topsrv, subscriber->conid);
+}
+
+static void subscr_release(struct tipc_subscriber *subscriber)
+{
 	struct tipc_subscription *sub;
 	struct tipc_subscription *sub_temp;
 
-	/* Invalidate subscriber reference */
-	port_ref = subscriber->port_ref;
-	subscriber->port_ref = 0;
-	spin_unlock_bh(subscriber->lock);
+	spin_lock_bh(&subscriber->lock);
 
-	/* Sever connection to subscriber */
-	tipc_shutdown(port_ref);
-	tipc_deleteport(port_ref);
+	/* Invalidate subscriber reference */
+	subscriber->conid = 0;
 
 	/* Destroy any existing subscriptions for subscriber */
 	list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
 				 subscription_list) {
 		if (sub->timeout != TIPC_WAIT_FOREVER) {
+			spin_unlock_bh(&subscriber->lock);
 			k_cancel_timer(&sub->timer);
 			k_term_timer(&sub->timer);
+			spin_lock_bh(&subscriber->lock);
 		}
 		subscr_del(sub);
 	}
-
-	/* Remove subscriber from topology server's subscriber list */
-	spin_lock_bh(&topsrv.lock);
-	list_del(&subscriber->subscriber_list);
-	spin_unlock_bh(&topsrv.lock);
-
-	/* Reclaim subscriber lock */
-	spin_lock_bh(subscriber->lock);
+	spin_unlock_bh(&subscriber->lock);
 
 	/* Now destroy subscriber */
 	kfree(subscriber);
@@ -247,7 +244,7 @@
 /**
  * subscr_cancel - handle subscription cancellation request
  *
- * Called with subscriber port locked.  Routine must temporarily release lock
+ * Called with subscriber lock held. Routine must temporarily release lock
  * to enable the subscription timeout routine to finish without deadlocking;
  * the lock is then reclaimed to allow caller to release it upon return.
  *
@@ -274,10 +271,10 @@
 	/* Cancel subscription timer (if used), then delete subscription */
 	if (sub->timeout != TIPC_WAIT_FOREVER) {
 		sub->timeout = TIPC_WAIT_FOREVER;
-		spin_unlock_bh(subscriber->lock);
+		spin_unlock_bh(&subscriber->lock);
 		k_cancel_timer(&sub->timer);
 		k_term_timer(&sub->timer);
-		spin_lock_bh(subscriber->lock);
+		spin_lock_bh(&subscriber->lock);
 	}
 	subscr_del(sub);
 }
@@ -285,7 +282,7 @@
 /**
  * subscr_subscribe - create subscription for subscriber
  *
- * Called with subscriber port locked.
+ * Called with subscriber lock held.
  */
 static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
 					     struct tipc_subscriber *subscriber)
@@ -304,7 +301,7 @@
 	}
 
 	/* Refuse subscription if global limit exceeded */
-	if (atomic_read(&topsrv.subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
+	if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
 		pr_warn("Subscription rejected, limit reached (%u)\n",
 			TIPC_MAX_SUBSCRIPTIONS);
 		subscr_terminate(subscriber);
@@ -335,10 +332,10 @@
 	}
 	INIT_LIST_HEAD(&sub->nameseq_list);
 	list_add(&sub->subscription_list, &subscriber->subscription_list);
-	sub->server_ref = subscriber->port_ref;
+	sub->subscriber = subscriber;
 	sub->swap = swap;
 	memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
-	atomic_inc(&topsrv.subscription_count);
+	atomic_inc(&subscription_count);
 	if (sub->timeout != TIPC_WAIT_FOREVER) {
 		k_init_timer(&sub->timer,
 			     (Handler)subscr_timeout, (unsigned long)sub);
@@ -348,196 +345,51 @@
 	return sub;
 }
 
-/**
- * subscr_conn_shutdown_event - handle termination request from subscriber
- *
- * Called with subscriber's server port unlocked.
- */
-static void subscr_conn_shutdown_event(void *usr_handle,
-				       u32 port_ref,
-				       struct sk_buff **buf,
-				       unsigned char const *data,
-				       unsigned int size,
-				       int reason)
+/* Handle one termination request for the subscriber */
+static void subscr_conn_shutdown_event(int conid, void *usr_data)
 {
-	struct tipc_subscriber *subscriber = usr_handle;
-	spinlock_t *subscriber_lock;
-
-	if (tipc_port_lock(port_ref) == NULL)
-		return;
-
-	subscriber_lock = subscriber->lock;
-	subscr_terminate(subscriber);
-	spin_unlock_bh(subscriber_lock);
+	subscr_release((struct tipc_subscriber *)usr_data);
 }
 
-/**
- * subscr_conn_msg_event - handle new subscription request from subscriber
- *
- * Called with subscriber's server port unlocked.
- */
-static void subscr_conn_msg_event(void *usr_handle,
-				  u32 port_ref,
-				  struct sk_buff **buf,
-				  const unchar *data,
-				  u32 size)
+/* Handle one request to create a new subscription for the subscriber */
+static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
+				  void *usr_data, void *buf, size_t len)
 {
-	struct tipc_subscriber *subscriber = usr_handle;
-	spinlock_t *subscriber_lock;
+	struct tipc_subscriber *subscriber = usr_data;
 	struct tipc_subscription *sub;
 
-	/*
-	 * Lock subscriber's server port (& make a local copy of lock pointer,
-	 * in case subscriber is deleted while processing subscription request)
-	 */
-	if (tipc_port_lock(port_ref) == NULL)
-		return;
-
-	subscriber_lock = subscriber->lock;
-
-	if (size != sizeof(struct tipc_subscr)) {
-		subscr_terminate(subscriber);
-		spin_unlock_bh(subscriber_lock);
-	} else {
-		sub = subscr_subscribe((struct tipc_subscr *)data, subscriber);
-		spin_unlock_bh(subscriber_lock);
-		if (sub != NULL) {
-
-			/*
-			 * We must release the server port lock before adding a
-			 * subscription to the name table since TIPC needs to be
-			 * able to (re)acquire the port lock if an event message
-			 * issued by the subscription process is rejected and
-			 * returned.  The subscription cannot be deleted while
-			 * it is being added to the name table because:
-			 * a) the single-threading of the native API port code
-			 *    ensures the subscription cannot be cancelled and
-			 *    the subscriber connection cannot be broken, and
-			 * b) the name table lock ensures the subscription
-			 *    timeout code cannot delete the subscription,
-			 * so the subscription object is still protected.
-			 */
-			tipc_nametbl_subscribe(sub);
-		}
-	}
+	spin_lock_bh(&subscriber->lock);
+	sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber);
+	if (sub)
+		tipc_nametbl_subscribe(sub);
+	spin_unlock_bh(&subscriber->lock);
 }
 
-/**
- * subscr_named_msg_event - handle request to establish a new subscriber
- */
-static void subscr_named_msg_event(void *usr_handle,
-				   u32 port_ref,
-				   struct sk_buff **buf,
-				   const unchar *data,
-				   u32 size,
-				   u32 importance,
-				   struct tipc_portid const *orig,
-				   struct tipc_name_seq const *dest)
+
+/* Handle one request to establish a new subscriber */
+static void *subscr_named_msg_event(int conid)
 {
 	struct tipc_subscriber *subscriber;
-	u32 server_port_ref;
 
 	/* Create subscriber object */
 	subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
 	if (subscriber == NULL) {
 		pr_warn("Subscriber rejected, no memory\n");
-		return;
+		return NULL;
 	}
 	INIT_LIST_HEAD(&subscriber->subscription_list);
-	INIT_LIST_HEAD(&subscriber->subscriber_list);
+	subscriber->conid = conid;
+	spin_lock_init(&subscriber->lock);
 
-	/* Create server port & establish connection to subscriber */
-	tipc_createport(subscriber,
-			importance,
-			NULL,
-			NULL,
-			subscr_conn_shutdown_event,
-			NULL,
-			NULL,
-			subscr_conn_msg_event,
-			NULL,
-			&subscriber->port_ref);
-	if (subscriber->port_ref == 0) {
-		pr_warn("Subscriber rejected, unable to create port\n");
-		kfree(subscriber);
-		return;
-	}
-	tipc_connect(subscriber->port_ref, orig);
-
-	/* Lock server port (& save lock address for future use) */
-	subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
-
-	/* Add subscriber to topology server's subscriber list */
-	spin_lock_bh(&topsrv.lock);
-	list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
-	spin_unlock_bh(&topsrv.lock);
-
-	/* Unlock server port */
-	server_port_ref = subscriber->port_ref;
-	spin_unlock_bh(subscriber->lock);
-
-	/* Send an ACK- to complete connection handshaking */
-	tipc_send(server_port_ref, 0, NULL, 0);
-
-	/* Handle optional subscription request */
-	if (size != 0) {
-		subscr_conn_msg_event(subscriber, server_port_ref,
-				      buf, data, size);
-	}
+	return (void *)subscriber;
 }
 
 int tipc_subscr_start(void)
 {
-	struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV};
-	int res;
-
-	spin_lock_init(&topsrv.lock);
-	INIT_LIST_HEAD(&topsrv.subscriber_list);
-
-	res = tipc_createport(NULL,
-			      TIPC_CRITICAL_IMPORTANCE,
-			      NULL,
-			      NULL,
-			      NULL,
-			      NULL,
-			      subscr_named_msg_event,
-			      NULL,
-			      NULL,
-			      &topsrv.setup_port);
-	if (res)
-		goto failed;
-
-	res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
-	if (res) {
-		tipc_deleteport(topsrv.setup_port);
-		topsrv.setup_port = 0;
-		goto failed;
-	}
-
-	return 0;
-
-failed:
-	pr_err("Failed to create subscription service\n");
-	return res;
+	return tipc_server_start(&topsrv);
 }
 
 void tipc_subscr_stop(void)
 {
-	struct tipc_subscriber *subscriber;
-	struct tipc_subscriber *subscriber_temp;
-	spinlock_t *subscriber_lock;
-
-	if (topsrv.setup_port) {
-		tipc_deleteport(topsrv.setup_port);
-		topsrv.setup_port = 0;
-
-		list_for_each_entry_safe(subscriber, subscriber_temp,
-					 &topsrv.subscriber_list,
-					 subscriber_list) {
-			subscriber_lock = subscriber->lock;
-			spin_lock_bh(subscriber_lock);
-			subscr_terminate(subscriber);
-			spin_unlock_bh(subscriber_lock);
-		}
-	}
+	tipc_server_stop(&topsrv);
 }
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index 218d2e0..43e6d63 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -2,7 +2,7 @@
  * net/tipc/subscr.h: Include file for TIPC network topology service
  *
  * Copyright (c) 2003-2006, Ericsson AB
- * Copyright (c) 2005-2007, Wind River Systems
+ * Copyright (c) 2005-2007, 2012-2013, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -37,10 +37,14 @@
 #ifndef _TIPC_SUBSCR_H
 #define _TIPC_SUBSCR_H
 
+#include "server.h"
+
 struct tipc_subscription;
+struct tipc_subscriber;
 
 /**
  * struct tipc_subscription - TIPC network topology subscription object
+ * @subscriber: pointer to its subscriber
  * @seq: name sequence associated with subscription
  * @timeout: duration of subscription (in ms)
  * @filter: event filtering to be done for subscription
@@ -52,13 +56,13 @@
  * @evt: template for events generated by subscription
  */
 struct tipc_subscription {
+	struct tipc_subscriber *subscriber;
 	struct tipc_name_seq seq;
 	u32 timeout;
 	u32 filter;
 	struct timer_list timer;
 	struct list_head nameseq_list;
 	struct list_head subscription_list;
-	u32 server_ref;
 	int swap;
 	struct tipc_event evt;
 };