rxflow remove recursion and simplify
Signed-off-by: Andy Green <andy.green@linaro.org>
diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c
index bcde2ac..c96db24 100644
--- a/lib/libwebsockets.c
+++ b/lib/libwebsockets.c
@@ -841,6 +841,7 @@
struct timeval tv;
int timed_out = 0;
int our_fd = 0;
+ char draining_flow = 0;
#ifndef LWS_NO_EXTENSIONS
int more = 1;
@@ -984,14 +985,25 @@
/* the guy requested a callback when it was OK to write */
if ((pollfd->revents & POLLOUT) &&
- wsi->state == WSI_STATE_ESTABLISHED)
- if (lws_handle_POLLOUT_event(context, wsi,
- pollfd) < 0) {
+ wsi->state == WSI_STATE_ESTABLISHED &&
+ lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) {
lwsl_info("libwebsocket_service_fd: closing\n");
goto close_and_handled;
}
+ if (wsi->u.ws.rxflow_buffer &&
+ (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) {
+ lwsl_info("draining rxflow\n");
+ /* well, drain it */
+ eff_buf.token = (char *)wsi->u.ws.rxflow_buffer +
+ wsi->u.ws.rxflow_pos;
+ eff_buf.token_len = wsi->u.ws.rxflow_len -
+ wsi->u.ws.rxflow_pos;
+ draining_flow = 1;
+ goto drain;
+ }
+
/* any incoming data ready? */
if (!(pollfd->revents & POLLIN))
@@ -1041,6 +1053,7 @@
*/
eff_buf.token = (char *)context->service_buffer;
+drain:
#ifndef LWS_NO_EXTENSIONS
more = 1;
while (more) {
@@ -1079,6 +1092,14 @@
eff_buf.token_len = 0;
}
#endif
+ if (draining_flow && wsi->u.ws.rxflow_buffer &&
+ wsi->u.ws.rxflow_pos == wsi->u.ws.rxflow_len) {
+ lwsl_info("flow buffer: drained\n");
+ free(wsi->u.ws.rxflow_buffer);
+ wsi->u.ws.rxflow_buffer = NULL;
+ /* having drained the rxflow buffer, can rearm POLLIN */
+ _libwebsocket_rx_flow_control(wsi);
+ }
#ifdef LWS_OPENSSL_SUPPORT
if (wsi->ssl && SSL_pending(wsi->ssl))
@@ -1099,10 +1120,9 @@
goto handled;
close_and_handled:
- libwebsocket_close_and_free_session(
- context, wsi,
- LWS_CLOSE_STATUS_NOSTATUS);
- n = 0;
+ libwebsocket_close_and_free_session(context, wsi,
+ LWS_CLOSE_STATUS_NOSTATUS);
+ n = 1;
handled:
pollfd->revents = 0;
@@ -1249,6 +1269,7 @@
libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
{
int n;
+ int m;
/* stay dead once we are dead */
@@ -1266,11 +1287,17 @@
/* any socket with events to service? */
- for (n = 0; n < context->fds_count; n++)
- if (context->fds[n].revents)
- if (libwebsocket_service_fd(context,
- &context->fds[n]) < 0)
- return -1;
+ for (n = 0; n < context->fds_count; n++) {
+ if (!context->fds[n].revents)
+ continue;
+ m = libwebsocket_service_fd(context, &context->fds[n]);
+ if (m < 0)
+ return -1;
+ /* if something closed, retry this slot */
+ if (m)
+ n--;
+ }
+
return 0;
}
@@ -1479,7 +1506,7 @@
#ifdef LWS_NO_SERVER
int
-_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
+_libwebsocket_rx_flow_control(struct libswebsocket *wsi)
{
return 0;
}
@@ -1488,34 +1515,33 @@
_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
{
struct libwebsocket_context *context = wsi->protocol->owning_server;
- int n;
- if (!(wsi->u.ws.rxflow_change_to & 2))
+ /* there is no pending change */
+ if (!(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE))
return 0;
- wsi->u.ws.rxflow_change_to &= ~2;
-
- lwsl_info("rxflow: wsi %p change_to %d\n",
- wsi, wsi->u.ws.rxflow_change_to);
-
- /* if we're letting it come again, did we interrupt anything? */
- if ((wsi->u.ws.rxflow_change_to & 1) && wsi->u.ws.rxflow_buffer) {
- n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0);
- if (n < 0) {
- lwsl_info("libwebsocket_rx_flow_control: close req\n");
- return -1;
- }
- if (n)
- /* oh he stuck again, do nothing */
- return 0;
+ /* stuff is still buffered, not ready to really accept new input */
+ if (wsi->u.ws.rxflow_buffer) {
+ /* get ourselves called back to deal with stashed buffer */
+ libwebsocket_callback_on_writable(context, wsi);
+ return 0;
}
- if (wsi->u.ws.rxflow_change_to & 1)
+ /* pending is cleared, we can change rxflow state */
+
+ wsi->u.ws.rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;
+
+ lwsl_info("rxflow: wsi %p change_to %d\n", wsi,
+ wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW);
+
+ /* adjust the pollfd for this wsi */
+
+ if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)
context->fds[wsi->position_in_fds_table].events |= POLLIN;
else
context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
- if (wsi->u.ws.rxflow_change_to & 1)
+ if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_SET_MODE_POLL_FD,
@@ -1544,7 +1570,11 @@
int
libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
{
- wsi->u.ws.rxflow_change_to = 2 | !!enable;
+ if (enable == (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW))
+ return 0;
+
+ lwsl_info("libwebsocket_rx_flow_control(0x%p, %d)\n", wsi, enable);
+ wsi->u.ws.rxflow_change_to = LWS_RXFLOW_PENDING_CHANGE | !!enable;
return 0;
}