rxflow remove recursion and simplify

Signed-off-by: Andy Green <andy.green@linaro.org>
diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c
index bcde2ac..c96db24 100644
--- a/lib/libwebsockets.c
+++ b/lib/libwebsockets.c
@@ -841,6 +841,7 @@
 	struct timeval tv;
 	int timed_out = 0;
 	int our_fd = 0;
+	char draining_flow = 0;
 
 #ifndef LWS_NO_EXTENSIONS
 	int more = 1;
@@ -984,14 +985,25 @@
 		/* the guy requested a callback when it was OK to write */
 
 		if ((pollfd->revents & POLLOUT) &&
-					    wsi->state == WSI_STATE_ESTABLISHED)
-			if (lws_handle_POLLOUT_event(context, wsi,
-								  pollfd) < 0) {
+			wsi->state == WSI_STATE_ESTABLISHED &&
+			   lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) {
 				lwsl_info("libwebsocket_service_fd: closing\n");
 				goto close_and_handled;
 			}
 
 
+		if (wsi->u.ws.rxflow_buffer &&
+			      (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) {
+			lwsl_info("draining rxflow\n");
+			/* well, drain it */
+			eff_buf.token = (char *)wsi->u.ws.rxflow_buffer +
+						wsi->u.ws.rxflow_pos;
+			eff_buf.token_len = wsi->u.ws.rxflow_len -
+						wsi->u.ws.rxflow_pos;
+			draining_flow = 1;
+			goto drain;
+		}
+
 		/* any incoming data ready? */
 
 		if (!(pollfd->revents & POLLIN))
@@ -1041,6 +1053,7 @@
 		 */
 
 		eff_buf.token = (char *)context->service_buffer;
+drain:
 #ifndef LWS_NO_EXTENSIONS
 		more = 1;
 		while (more) {
@@ -1079,6 +1092,14 @@
 			eff_buf.token_len = 0;
 		}
 #endif
+		if (draining_flow && wsi->u.ws.rxflow_buffer &&
+				 wsi->u.ws.rxflow_pos == wsi->u.ws.rxflow_len) {
+			lwsl_info("flow buffer: drained\n");
+			free(wsi->u.ws.rxflow_buffer);
+			wsi->u.ws.rxflow_buffer = NULL;
+			/* having drained the rxflow buffer, can rearm POLLIN */
+			_libwebsocket_rx_flow_control(wsi);
+		}
 
 #ifdef LWS_OPENSSL_SUPPORT
 		if (wsi->ssl && SSL_pending(wsi->ssl))
@@ -1099,10 +1120,9 @@
 	goto handled;
 
 close_and_handled:
-	libwebsocket_close_and_free_session(
-		context, wsi,
-			    LWS_CLOSE_STATUS_NOSTATUS);
-	n = 0;
+	libwebsocket_close_and_free_session(context, wsi,
+						LWS_CLOSE_STATUS_NOSTATUS);
+	n = 1;
 
 handled:
 	pollfd->revents = 0;
@@ -1249,6 +1269,7 @@
 libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
 {
 	int n;
+	int m;
 
 	/* stay dead once we are dead */
 
@@ -1266,11 +1287,17 @@
 
 	/* any socket with events to service? */
 
-	for (n = 0; n < context->fds_count; n++)
-		if (context->fds[n].revents)
-			if (libwebsocket_service_fd(context,
-							&context->fds[n]) < 0)
-				return -1;
+	for (n = 0; n < context->fds_count; n++) {
+		if (!context->fds[n].revents)
+			continue;
+		m = libwebsocket_service_fd(context, &context->fds[n]);
+		if (m < 0)
+			return -1;
+		/* if something closed, retry this slot */
+		if (m)
+			n--;
+	}
+
 	return 0;
 }
 
@@ -1479,7 +1506,7 @@
 
 #ifdef LWS_NO_SERVER
 int
-_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
+_libwebsocket_rx_flow_control(struct libswebsocket *wsi)
 {
 	return 0;
 }
@@ -1488,34 +1515,33 @@
 _libwebsocket_rx_flow_control(struct libwebsocket *wsi)
 {
 	struct libwebsocket_context *context = wsi->protocol->owning_server;
-	int n;
 
-	if (!(wsi->u.ws.rxflow_change_to & 2))
+	/* there is no pending change */
+	if (!(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE))
 		return 0;
 
-	wsi->u.ws.rxflow_change_to &= ~2;
-
-	lwsl_info("rxflow: wsi %p change_to %d\n",
-					wsi, wsi->u.ws.rxflow_change_to);
-
-	/* if we're letting it come again, did we interrupt anything? */
-	if ((wsi->u.ws.rxflow_change_to & 1) && wsi->u.ws.rxflow_buffer) {
-		n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0);
-		if (n < 0) {
-			lwsl_info("libwebsocket_rx_flow_control: close req\n");
-			return -1;
-		}
-		if (n)
-			/* oh he stuck again, do nothing */
-			return 0;
+	/* stuff is still buffered, not ready to really accept new input */
+	if (wsi->u.ws.rxflow_buffer) {
+		/* get ourselves called back to deal with stashed buffer */
+		libwebsocket_callback_on_writable(context, wsi);
+		return 0;
 	}
 
-	if (wsi->u.ws.rxflow_change_to & 1)
+	/* pending is cleared, we can change rxflow state */
+
+	wsi->u.ws.rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;
+
+	lwsl_info("rxflow: wsi %p change_to %d\n", wsi,
+			      wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW);
+
+	/* adjust the pollfd for this wsi */
+
+	if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)
 		context->fds[wsi->position_in_fds_table].events |= POLLIN;
 	else
 		context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
 
-	if (wsi->u.ws.rxflow_change_to & 1)
+	if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)
 		/* external POLL support via protocol 0 */
 		context->protocols[0].callback(context, wsi,
 			LWS_CALLBACK_SET_MODE_POLL_FD,
@@ -1544,7 +1570,11 @@
 int
 libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
 {
-	wsi->u.ws.rxflow_change_to = 2 | !!enable;
+	if (enable == (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW))
+		return 0;
+
+	lwsl_info("libwebsocket_rx_flow_control(0x%p, %d)\n", wsi, enable);
+	wsi->u.ws.rxflow_change_to = LWS_RXFLOW_PENDING_CHANGE | !!enable;
 
 	return 0;
 }