tipc: involve reference counter for subscriber

At present subscriber's lock is used to protect the subscription list
of subscriber as well as subscriptions linked into the list. While one
or all subscriptions are deleted through iterating the list, the
subscriber's lock must be held. Meanwhile, as deletion of subscription
may happen in subscription timer's handler, the lock must be grabbed
in the function as well. When subscription's timer is terminated with
del_timer_sync() during above iteration, subscriber's lock has to be
temporarily released, otherwise, deadlock may occur. However, the
temporary release may cause the double free of a subscription as the
subscription is not disconnected from the subscription list.

Now if a reference counter is introduced to subscriber, subscription's
timer can be asynchronously stopped with del_timer(). As a result, the
issue is not only able to be fixed, but also relevant code is pretty
readable and understandable.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Jon Maloy <jon.maloy@ericson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index d0dbde4..9b24c9c 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -40,16 +40,21 @@
 
 /**
  * struct tipc_subscriber - TIPC network topology subscriber
+ * @kref: reference counter to tipc_subscription object
  * @conid: connection identifier to server connecting to subscriber
  * @lock: control access to subscriber
  * @subscrp_list: list of subscription objects for this subscriber
  */
 struct tipc_subscriber {
+	struct kref kref;
 	int conid;
 	spinlock_t lock;
 	struct list_head subscrp_list;
 };
 
+static void tipc_subscrp_delete(struct tipc_subscription *sub);
+static void tipc_subscrb_put(struct tipc_subscriber *subscriber);
+
 /**
  * htohl - convert value to endianness used by destination
  * @in: value to convert
@@ -116,32 +121,69 @@
 {
 	struct tipc_subscription *sub = (struct tipc_subscription *)data;
 	struct tipc_subscriber *subscriber = sub->subscriber;
-	struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
-
-	/* The spin lock per subscriber is used to protect its members */
-	spin_lock_bh(&subscriber->lock);
-
-	/* Validate timeout (in case subscription is being cancelled) */
-	if (sub->timeout == TIPC_WAIT_FOREVER) {
-		spin_unlock_bh(&subscriber->lock);
-		return;
-	}
-
-	/* Unlink subscription from name table */
-	tipc_nametbl_unsubscribe(sub);
-
-	/* Unlink subscription from subscriber */
-	list_del(&sub->subscrp_list);
-
-	spin_unlock_bh(&subscriber->lock);
 
 	/* Notify subscriber of timeout */
 	tipc_subscrp_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
 				TIPC_SUBSCR_TIMEOUT, 0, 0);
 
-	/* Now destroy subscription */
-	kfree(sub);
-	atomic_dec(&tn->subscription_count);
+	spin_lock_bh(&subscriber->lock);
+	tipc_subscrp_delete(sub);
+	spin_unlock_bh(&subscriber->lock);
+
+	tipc_subscrb_put(subscriber);
+}
+
+static void tipc_subscrb_kref_release(struct kref *kref)
+{
+	struct tipc_subscriber *subcriber = container_of(kref,
+					    struct tipc_subscriber, kref);
+
+	kfree(subcriber);
+}
+
+static void tipc_subscrb_put(struct tipc_subscriber *subscriber)
+{
+	kref_put(&subscriber->kref, tipc_subscrb_kref_release);
+}
+
+static void tipc_subscrb_get(struct tipc_subscriber *subscriber)
+{
+	kref_get(&subscriber->kref);
+}
+
+static struct tipc_subscriber *tipc_subscrb_create(int conid)
+{
+	struct tipc_subscriber *subscriber;
+
+	subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
+	if (!subscriber) {
+		pr_warn("Subscriber rejected, no memory\n");
+		return NULL;
+	}
+	kref_init(&subscriber->kref);
+	INIT_LIST_HEAD(&subscriber->subscrp_list);
+	subscriber->conid = conid;
+	spin_lock_init(&subscriber->lock);
+
+	return subscriber;
+}
+
+static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
+{
+	struct tipc_subscription *sub, *temp;
+
+	spin_lock_bh(&subscriber->lock);
+	/* Destroy any existing subscriptions for subscriber */
+	list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
+				 subscrp_list) {
+		if (del_timer(&sub->timer)) {
+			tipc_subscrp_delete(sub);
+			tipc_subscrb_put(subscriber);
+		}
+	}
+	spin_unlock_bh(&subscriber->lock);
+
+	tipc_subscrb_put(subscriber);
 }
 
 static void tipc_subscrp_delete(struct tipc_subscription *sub)
@@ -154,80 +196,22 @@
 	atomic_dec(&tn->subscription_count);
 }
 
-static struct tipc_subscriber *tipc_subscrb_create(int conid)
-{
-	struct tipc_subscriber *subscriber;
-
-	subscriber = kzalloc(sizeof(*subscriber), GFP_ATOMIC);
-	if (!subscriber) {
-		pr_warn("Subscriber rejected, no memory\n");
-		return NULL;
-	}
-	INIT_LIST_HEAD(&subscriber->subscrp_list);
-	subscriber->conid = conid;
-	spin_lock_init(&subscriber->lock);
-
-	return subscriber;
-}
-
-static void tipc_subscrb_delete(struct tipc_subscriber *subscriber)
-{
-	struct tipc_subscription *sub;
-	struct tipc_subscription *sub_temp;
-
-	spin_lock_bh(&subscriber->lock);
-
-	/* Destroy any existing subscriptions for subscriber */
-	list_for_each_entry_safe(sub, sub_temp, &subscriber->subscrp_list,
-				 subscrp_list) {
-		if (sub->timeout != TIPC_WAIT_FOREVER) {
-			spin_unlock_bh(&subscriber->lock);
-			del_timer_sync(&sub->timer);
-			spin_lock_bh(&subscriber->lock);
-		}
-		tipc_subscrp_delete(sub);
-	}
-	spin_unlock_bh(&subscriber->lock);
-
-	/* Now destroy subscriber */
-	kfree(subscriber);
-}
-
-/**
- * tipc_subscrp_cancel - handle subscription cancellation request
- *
- * Called with subscriber lock held. Routine must temporarily release lock
- * to enable the subscription timeout routine to finish without deadlocking;
- * the lock is then reclaimed to allow caller to release it upon return.
- *
- * Note that fields of 's' use subscriber's endianness!
- */
 static void tipc_subscrp_cancel(struct tipc_subscr *s,
 				struct tipc_subscriber *subscriber)
 {
-	struct tipc_subscription *sub;
-	struct tipc_subscription *sub_temp;
-	int found = 0;
+	struct tipc_subscription *sub, *temp;
 
 	/* Find first matching subscription, exit if not found */
-	list_for_each_entry_safe(sub, sub_temp, &subscriber->subscrp_list,
+	list_for_each_entry_safe(sub, temp, &subscriber->subscrp_list,
 				 subscrp_list) {
 		if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
-			found = 1;
+			if (del_timer(&sub->timer)) {
+				tipc_subscrp_delete(sub);
+				tipc_subscrb_put(subscriber);
+			}
 			break;
 		}
 	}
-	if (!found)
-		return;
-
-	/* Cancel subscription timer (if used), then delete subscription */
-	if (sub->timeout != TIPC_WAIT_FOREVER) {
-		sub->timeout = TIPC_WAIT_FOREVER;
-		spin_unlock_bh(&subscriber->lock);
-		del_timer_sync(&sub->timer);
-		spin_lock_bh(&subscriber->lock);
-	}
-	tipc_subscrp_delete(sub);
 }
 
 static int tipc_subscrp_create(struct net *net, struct tipc_subscr *s,
@@ -281,11 +265,11 @@
 	sub->swap = swap;
 	memcpy(&sub->evt.s, s, sizeof(*s));
 	atomic_inc(&tn->subscription_count);
-	if (sub->timeout != TIPC_WAIT_FOREVER) {
-		setup_timer(&sub->timer, tipc_subscrp_timeout,
-			    (unsigned long)sub);
-		mod_timer(&sub->timer, jiffies + sub->timeout);
-	}
+	setup_timer(&sub->timer, tipc_subscrp_timeout, (unsigned long)sub);
+	if (sub->timeout != TIPC_WAIT_FOREVER)
+		sub->timeout += jiffies;
+	if (!mod_timer(&sub->timer, sub->timeout))
+		tipc_subscrb_get(subscriber);
 	*sub_p = sub;
 	return 0;
 }