solve flowcontrol problems

Problems with rx flow control implementation were the underlying cause
of the connection stalling issue that was covered up with the udelay()
patch that was removed recently.

This get rx flow control working properly and corrects problems with
fifo management in the test server mirror protocol code too.

The rxfow control api has been changed to just set a flag, so it's very cheap
to call from user code.  After the callbacks that might use the rxflow control
api the flag is checked and any pending actions done.

rx flow control now stops any rx packet coming immediately, with compessed
connections "just what was left in the pipe" might be hundreds of KBytes.  To
implement that the current packet being decoded is copied into a malloc'd buffer
by the rx processing code now.

When rxflow is allows to come again, the buffer is drained and freed before any
new packet content is accepted.

Signed-off-by: Andy Green <andy.green@linaro.org>
diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c
index add3bd7..99ce9e8 100644
--- a/lib/libwebsockets.c
+++ b/lib/libwebsockets.c
@@ -371,6 +371,9 @@
 	if (wsi->c_address)
 		free(wsi->c_address);
 
+	if (wsi->rxflow_buffer)
+		free(wsi->rxflow_buffer);
+
 /*	lwsl_info("closing fd=%d\n", wsi->sock); */
 
 #ifdef LWS_OPENSSL_SUPPORT
@@ -660,7 +663,8 @@
 	else
 		n = LWS_CALLBACK_SERVER_WRITEABLE;
 
-	wsi->protocol->callback(context, wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0);
+	user_callback_handle_rxflow(wsi->protocol->callback, context,
+		wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0);
 
 	return 0;
 }
@@ -902,7 +906,7 @@
 					       LWS_CLOSE_STATUS_NOSTATUS);
 		else
 			if (wsi->state == WSI_STATE_HTTP && wsi->protocol->callback)
-				if (wsi->protocol->callback(context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space,
+				if (user_callback_handle_rxflow(wsi->protocol->callback, context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space,
 								wsi->filepath, wsi->filepos))
 					libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS);
 		break;
@@ -1117,7 +1121,7 @@
 
 			/* broadcast it to this connection */
 
-			new_wsi->protocol->callback(context, new_wsi,
+			user_callback_handle_rxflow(new_wsi->protocol->callback, context, new_wsi,
 				LWS_CALLBACK_BROADCAST,
 				new_wsi->user_space,
 				buf + LWS_SEND_BUFFER_PRE_PADDING, len);
@@ -1531,6 +1535,51 @@
 	return wsi->sock;
 }
 
+
+int
+_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
+{
+	struct libwebsocket_context *context = wsi->protocol->owning_server;
+	int n;
+
+	if (!(wsi->rxflow_change_to & 2))
+		return 0;
+
+	wsi->rxflow_change_to &= ~2;
+
+	lwsl_info("rxflow: wsi %p change_to %d\n", wsi, wsi->rxflow_change_to);
+
+	/* if we're letting it come again, did we interrupt anything? */
+	if ((wsi->rxflow_change_to & 1) && wsi->rxflow_buffer) {
+		n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0);
+		if (n < 0) {
+			libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS);
+			return -1;
+		}
+		if (n)
+			/* oh he stuck again, do nothing */
+			return 0;
+	}
+
+	if (wsi->rxflow_change_to & 1)
+		context->fds[wsi->position_in_fds_table].events |= POLLIN;
+	else
+		context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
+
+	if (wsi->rxflow_change_to & 1)
+		/* external POLL support via protocol 0 */
+		context->protocols[0].callback(context, wsi,
+			LWS_CALLBACK_SET_MODE_POLL_FD,
+			(void *)(long)wsi->sock, NULL, POLLIN);
+	else
+		/* external POLL support via protocol 0 */
+		context->protocols[0].callback(context, wsi,
+			LWS_CALLBACK_CLEAR_MODE_POLL_FD,
+			(void *)(long)wsi->sock, NULL, POLLIN);
+
+	return 1;
+}
+
 /**
  * libwebsocket_rx_flow_control() - Enable and disable socket servicing for
  *				receieved packets.
@@ -1545,36 +1594,12 @@
 int
 libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
 {
-	struct libwebsocket_context *context = wsi->protocol->owning_server;
-	int n;
+	wsi->rxflow_change_to = 2 | !!enable;
 
-	for (n = 0; n < context->fds_count; n++)
-		if (context->fds[n].fd == wsi->sock) {
-			if (enable)
-				context->fds[n].events |= POLLIN;
-			else
-				context->fds[n].events &= ~POLLIN;
-
-			return 0;
-		}
-
-	if (enable)
-		/* external POLL support via protocol 0 */
-		context->protocols[0].callback(context, wsi,
-			LWS_CALLBACK_SET_MODE_POLL_FD,
-			(void *)(long)wsi->sock, NULL, POLLIN);
-	else
-		/* external POLL support via protocol 0 */
-		context->protocols[0].callback(context, wsi,
-			LWS_CALLBACK_CLEAR_MODE_POLL_FD,
-			(void *)(long)wsi->sock, NULL, POLLIN);
-
-#if 0
-	lwsl_err("libwebsocket_rx_flow_control unable to find socket\n");
-#endif
-	return 1;
+	return 0;
 }
 
+
 /**
  * libwebsocket_canonical_hostname() - returns this host's hostname
  *
@@ -1630,6 +1655,23 @@
 }
 #endif
 
+int user_callback_handle_rxflow(callback_function callback_function,
+		struct libwebsocket_context * context,
+			struct libwebsocket *wsi,
+			 enum libwebsocket_callback_reasons reason, void *user,
+							  void *in, size_t len)
+{
+	int n;
+
+	n = callback_function(context, wsi, reason, user, in, len);
+	if (n < 0)
+		return n;
+
+	_libwebsocket_rx_flow_control(wsi);
+
+	return 0;
+}
+
 
 /**
  * libwebsocket_create_context() - Create the websocket handler
@@ -2366,7 +2408,8 @@
 			if (wsi->protocol != protocol)
 				continue;
 
-			wsi->protocol->callback(context, wsi,
+			user_callback_handle_rxflow(wsi->protocol->callback,
+				 context, wsi,
 				 LWS_CALLBACK_BROADCAST,
 				 wsi->user_space,
 				 buf, len);