[SCTP]: Implement SCTP_FRAGMENT_INTERLEAVE socket option

This option was introduced in draft-ietf-tsvwg-sctpsocket-13.  It
prevents head-of-line blocking in the case of one-to-many endpoint.
Applications enabling this option really must enable SCTP_SNDRCV event
so that they would know where the data belongs.  Based on an
earlier patch by Ivan Skytte Jørgensen.

Additionally, this functionality now permits multiple associations
on the same endpoint to enter Partial Delivery.  Applications should
be extra careful, when using this functionality, to track EOR indicators.

Signed-off-by: Vlad Yasevich <vladislav.yasevich@hp.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/sctp/ulpqueue.c b/net/sctp/ulpqueue.c
index b29e3e4..ac80c34 100644
--- a/net/sctp/ulpqueue.c
+++ b/net/sctp/ulpqueue.c
@@ -138,18 +138,42 @@
 /* Clear the partial delivery mode for this socket.   Note: This
  * assumes that no association is currently in partial delivery mode.
  */
-int sctp_clear_pd(struct sock *sk)
+int sctp_clear_pd(struct sock *sk, struct sctp_association *asoc)
 {
 	struct sctp_sock *sp = sctp_sk(sk);
 
-	sp->pd_mode = 0;
-	if (!skb_queue_empty(&sp->pd_lobby)) {
-		struct list_head *list;
-		sctp_skb_list_tail(&sp->pd_lobby, &sk->sk_receive_queue);
-		list = (struct list_head *)&sctp_sk(sk)->pd_lobby;
-		INIT_LIST_HEAD(list);
-		return 1;
+	if (atomic_dec_and_test(&sp->pd_mode)) {
+		/* This means there are no other associations in PD, so
+		 * we can go ahead and clear out the lobby in one shot
+		 */
+		if (!skb_queue_empty(&sp->pd_lobby)) {
+			struct list_head *list;
+			sctp_skb_list_tail(&sp->pd_lobby, &sk->sk_receive_queue);
+			list = (struct list_head *)&sctp_sk(sk)->pd_lobby;
+			INIT_LIST_HEAD(list);
+			return 1;
+		}
+	} else {
+		/* There are other associations in PD, so we only need to
+		 * pull stuff out of the lobby that belongs to the
+		 * associations that is exiting PD (all of its notifications
+		 * are posted here).
+		 */
+		if (!skb_queue_empty(&sp->pd_lobby) && asoc) {
+			struct sk_buff *skb, *tmp;
+			struct sctp_ulpevent *event;
+
+			sctp_skb_for_each(skb, &sp->pd_lobby, tmp) {
+				event = sctp_skb2event(skb);
+				if (event->asoc == asoc) {
+					__skb_unlink(skb, &sp->pd_lobby);
+					__skb_queue_tail(&sk->sk_receive_queue,
+							 skb);
+				}
+			}
+		}
 	}
+
 	return 0;
 }
 
@@ -157,7 +181,7 @@
 static int sctp_ulpq_clear_pd(struct sctp_ulpq *ulpq)
 {
 	ulpq->pd_mode = 0;
-	return sctp_clear_pd(ulpq->asoc->base.sk);
+	return sctp_clear_pd(ulpq->asoc->base.sk, ulpq->asoc);
 }
 
 /* If the SKB of 'event' is on a list, it is the first such member
@@ -187,25 +211,35 @@
 	 * the association the cause of the partial delivery.
 	 */
 
-	if (!sctp_sk(sk)->pd_mode) {
+	if (atomic_read(&sctp_sk(sk)->pd_mode) == 0) {
 		queue = &sk->sk_receive_queue;
-	} else if (ulpq->pd_mode) {
-		/* If the association is in partial delivery, we
-		 * need to finish delivering the partially processed
-		 * packet before passing any other data.  This is
-		 * because we don't truly support stream interleaving.
-		 */
-		if ((event->msg_flags & MSG_NOTIFICATION) ||
-		    (SCTP_DATA_NOT_FRAG ==
-			    (event->msg_flags & SCTP_DATA_FRAG_MASK)))
-			queue = &sctp_sk(sk)->pd_lobby;
-		else {
-			clear_pd = event->msg_flags & MSG_EOR;
-			queue = &sk->sk_receive_queue;
+	} else {
+		if (ulpq->pd_mode) {
+			/* If the association is in partial delivery, we
+			 * need to finish delivering the partially processed
+			 * packet before passing any other data.  This is
+			 * because we don't truly support stream interleaving.
+			 */
+			if ((event->msg_flags & MSG_NOTIFICATION) ||
+			    (SCTP_DATA_NOT_FRAG ==
+				    (event->msg_flags & SCTP_DATA_FRAG_MASK)))
+				queue = &sctp_sk(sk)->pd_lobby;
+			else {
+				clear_pd = event->msg_flags & MSG_EOR;
+				queue = &sk->sk_receive_queue;
+			}
+		} else {
+			/*
+			 * If fragment interleave is enabled, we
+			 * can queue this to the recieve queue instead
+			 * of the lobby.
+			 */
+			if (sctp_sk(sk)->frag_interleave)
+				queue = &sk->sk_receive_queue;
+			else
+				queue = &sctp_sk(sk)->pd_lobby;
 		}
-	} else
-		queue = &sctp_sk(sk)->pd_lobby;
-
+	}
 
 	/* If we are harvesting multiple skbs they will be
 	 * collected on a list.
@@ -826,18 +860,29 @@
 {
 	struct sctp_ulpevent *event;
 	struct sctp_association *asoc;
+	struct sctp_sock *sp;
 
 	asoc = ulpq->asoc;
+	sp = sctp_sk(asoc->base.sk);
 
-	/* Are we already in partial delivery mode?  */
-	if (!sctp_sk(asoc->base.sk)->pd_mode) {
+	/* If the association is already in Partial Delivery mode
+	 * we have noting to do.
+	 */
+	if (ulpq->pd_mode)
+		return;
 
+	/* If the user enabled fragment interleave socket option,
+	 * multiple associations can enter partial delivery.
+	 * Otherwise, we can only enter partial delivery if the
+	 * socket is not in partial deliver mode.
+	 */
+	if (sp->frag_interleave || atomic_read(&sp->pd_mode) == 0) {
 		/* Is partial delivery possible?  */
 		event = sctp_ulpq_retrieve_first(ulpq);
 		/* Send event to the ULP.   */
 		if (event) {
 			sctp_ulpq_tail_event(ulpq, event);
-			sctp_sk(asoc->base.sk)->pd_mode = 1;
+			atomic_inc(&sp->pd_mode);
 			ulpq->pd_mode = 1;
 			return;
 		}