replace hashtable polltable management

This rips out the connection hashtable implementation along with
MAX_CLIENTS and replaces it with a dynamically allocated fds array
and lookup table along the same lines as the new extpoll implementation
from Edwin van den Oetelaar.

It detects the max number of file descriptors possible at context init
time and allocates accordingly; this can be externally controlled by
ulimit and the server run as a specific user to facilitate targeting
specific ulimit rules at it.

Many operations that translated between socket descriptors and struct
websocket or pollfd objects have had iteration removed by this patch
and under load will be a lot faster.

Signed-off-by: Andy Green <andy.green@linaro.org>
diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c
index c456d21..6b2f2db 100644
--- a/lib/libwebsockets.c
+++ b/lib/libwebsockets.c
@@ -61,57 +61,71 @@
 	"CLIENT",
 };
 
-/* file descriptor hash management */
-
-struct libwebsocket *
-wsi_from_fd(struct libwebsocket_context *context, int fd)
-{
-	int h = LWS_FD_HASH(fd);
-	int n = 0;
-
-	for (n = 0; n < context->fd_hashtable[h].length; n++)
-		if (context->fd_hashtable[h].wsi[n]->sock == fd)
-			return context->fd_hashtable[h].wsi[n];
-
-	return NULL;
-}
-
 int
-insert_wsi(struct libwebsocket_context *context, struct libwebsocket *wsi)
+insert_wsi_socket_into_fds(struct libwebsocket_context *context, struct libwebsocket *wsi)
 {
-	int h = LWS_FD_HASH(wsi->sock);
-
-	if (context->fd_hashtable[h].length == MAX_CLIENTS - 1) {
-		lwsl_err("hash table overflow\n");
+	if (context->fds_count >= context->max_fds) {
+		lwsl_err("Reached limit of fds tracking (%d)\n", context->max_fds);
 		return 1;
 	}
 
-	context->fd_hashtable[h].wsi[context->fd_hashtable[h].length++] = wsi;
+	if (wsi->sock > context->max_fds) {
+		lwsl_err("Socket fd %d is beyond what we can index (%d)\n", wsi->sock, context->max_fds);
+		return 1;
+	}
+
+	assert(wsi);
+	assert(wsi->sock);
+
+	lwsl_info("insert_wsi_socket_into_fds: wsi=%p, sock=%d, fds pos=%d\n", wsi, wsi->sock, context->fds_count);
+
+	context->lws_lookup[wsi->sock] = wsi;
+	wsi->position_in_fds_table = context->fds_count;
+	context->fds[context->fds_count].fd = wsi->sock;
+	context->fds[context->fds_count].events = POLLIN;
+	context->fds[context->fds_count++].revents = 0;
+
+	/* external POLL support via protocol 0 */
+	context->protocols[0].callback(context, wsi,
+		LWS_CALLBACK_ADD_POLL_FD,
+		(void *)(long)wsi->sock, NULL, POLLIN);
 
 	return 0;
 }
 
-int
-delete_from_fd(struct libwebsocket_context *context, int fd)
+static int
+remove_wsi_socket_from_fds(struct libwebsocket_context *context, struct libwebsocket *wsi)
 {
-	int h = LWS_FD_HASH(fd);
-	int n = 0;
+	int m;
 
-	for (n = 0; n < context->fd_hashtable[h].length; n++)
-		if (context->fd_hashtable[h].wsi[n]->sock == fd) {
-			while (n < context->fd_hashtable[h].length) {
-				context->fd_hashtable[h].wsi[n] =
-					    context->fd_hashtable[h].wsi[n + 1];
-				n++;
-			}
-			context->fd_hashtable[h].length--;
+	if (!--context->fds_count)
+		goto do_ext;
 
-			return 0;
-		}
+	if (wsi->sock > context->max_fds) {
+		lwsl_err("Socket fd %d is beyond what we can index (%d)\n", wsi->sock, context->max_fds);
+		return 1;
+	}
 
-	lwsl_err("Failed to find fd %d requested for "
-						   "delete in hashtable\n", fd);
-	return 1;
+	lwsl_info("remove_wsi_socket_from_fds: wsi=%p, sock=%d, fds pos=%d\n", wsi, wsi->sock, wsi->position_in_fds_table);
+
+	m = wsi->position_in_fds_table; /* replace the contents for this */
+
+	/* have the last guy take up the vacant slot */
+	context->fds[m] = context->fds[context->fds_count]; /* vacant fds slot filled with end one */
+	/* end guy's fds_lookup entry remains unchanged (still same fd pointing to same wsi) */
+	/* end guy's "position in fds table" changed */
+	context->lws_lookup[context->fds[context->fds_count].fd]->position_in_fds_table = m;
+	/* deletion guy's lws_lookup entry needs nuking */
+	context->lws_lookup[wsi->sock] = NULL; /* no WSI for the socket of the wsi being removed*/
+	wsi->position_in_fds_table = -1; /* removed wsi has no position any more */
+
+do_ext:
+	/* remove also from external POLL support via protocol 0 */
+	if (wsi->sock)
+		context->protocols[0].callback(context, wsi,
+		    LWS_CALLBACK_DEL_POLL_FD, (void *)(long)wsi->sock, NULL, 0);
+
+	return 0;
 }
 
 #ifdef LWS_OPENSSL_SUPPORT
@@ -298,35 +312,10 @@
 
 	/*
 	 * we won't be servicing or receiving anything further from this guy
-	 * remove this fd from wsi mapping hashtable
+	 * delete socket from the internal poll list if still present
 	 */
 
-	if (wsi->sock)
-		delete_from_fd(context, wsi->sock);
-
-	/* delete it from the internal poll list if still present */
-
-	m = 0;
-	for (n = 0; n < context->fds_count; n++) {
-		if (context->fds[n].fd != wsi->sock)
-			continue;
-		m = 1;
-		while (n < context->fds_count - 1) {
-			context->fds[n] = context->fds[n + 1];
-			n++;
-		}
-		context->fds_count--;
-		/* we only have to deal with one */
-		n = context->fds_count;
-	}
-
-	if (!m)
-		lwsl_err("Failed to remove fd %d from fds array\n", wsi->sock);
-
-	/* remove also from external POLL support via protocol 0 */
-	if (wsi->sock)
-		context->protocols[0].callback(context, wsi,
-		    LWS_CALLBACK_DEL_POLL_FD, (void *)(long)wsi->sock, NULL, 0);
+	remove_wsi_socket_from_fds(context, wsi);
 
 	wsi->state = WSI_STATE_DEAD_SOCKET;
 
@@ -411,13 +400,13 @@
 void
 libwebsockets_hangup_on_client(struct libwebsocket_context *context, int fd)
 {
-	struct libwebsocket *wsi = wsi_from_fd(context, fd);
+	struct libwebsocket *wsi = context->lws_lookup[fd];
 
-	if (wsi == NULL)
-		return;
-
-	libwebsocket_close_and_free_session(context, wsi,
-						     LWS_CLOSE_STATUS_NOSTATUS);
+	if (wsi) {
+		libwebsocket_close_and_free_session(context,
+			wsi, LWS_CLOSE_STATUS_NOSTATUS);
+	} else
+		close(fd);
 }
 
 
@@ -795,10 +784,11 @@
 		/* global timeout check once per second */
 
 		for (n = 0; n < context->fds_count; n++) {
-			wsi = wsi_from_fd(context, context->fds[n].fd);
-
-			libwebsocket_service_timeout_check(context, wsi,
-								     tv.tv_sec);
+			struct libwebsocket *new_wsi = context->lws_lookup[context->fds[n].fd];
+			if (!new_wsi)
+				continue;
+			libwebsocket_service_timeout_check(context,
+				new_wsi, tv.tv_sec);
 		}
 	}
 
@@ -844,10 +834,10 @@
 
 	/* okay, what we came here to do... */
 
-	wsi = wsi_from_fd(context, pollfd->fd);
-
+	wsi = context->lws_lookup[pollfd->fd];
 	if (wsi == NULL) {
-		lwsl_debug("hm fd %d has NULL wsi\n", pollfd->fd);
+		if (pollfd->fd > 11)
+			lwsl_err("unexpected NULL wsi fd=%d fds_count=%d\n", pollfd->fd, context->fds_count);
 		return 0;
 	}
 
@@ -915,11 +905,6 @@
 		if (!(pollfd->revents & POLLIN))
 			break;
 
-		if (context->fds_count >= MAX_CLIENTS) {
-			lwsl_warn("too busy to accept new client\n");
-			break;
-		}
-
 		/* listen socket got an unencrypted connection... */
 
 		clilen = sizeof(cli_addr);
@@ -1007,23 +992,7 @@
 			lwsl_debug("accepted new conn  port %u on fd=%d\n",
 					  ntohs(cli_addr.sin_port), accept_fd);
 
-		insert_wsi(context, new_wsi);
-
-		/*
-		 * make sure NO events are seen yet on this new socket
-		 * (otherwise we inherit old fds[client].revents from
-		 * previous socket there and die mysteriously! )
-		 */
-		context->fds[context->fds_count].revents = 0;
-
-		context->fds[context->fds_count].events = POLLIN;
-		context->fds[context->fds_count++].fd = accept_fd;
-
-		/* external POLL support via protocol 0 */
-		context->protocols[0].callback(context, new_wsi,
-			LWS_CALLBACK_ADD_POLL_FD,
-			(void *)(long)accept_fd, NULL, POLLIN);
-
+		insert_wsi_socket_into_fds(context, new_wsi);
 		break;
 
 	case LWS_CONNMODE_BROADCAST_PROXY_LISTENER:
@@ -1043,12 +1012,6 @@
 			return 0;
 		}
 
-		if (context->fds_count >= MAX_CLIENTS) {
-			lwsl_err("too busy to accept new broadcast "
-							      "proxy client\n");
-			goto bail_prox_listener;
-		}
-
 		/* create a dummy wsi for the connection and add it */
 
 		new_wsi = (struct libwebsocket *)malloc(sizeof(struct libwebsocket));
@@ -1064,19 +1027,8 @@
 		/* note which protocol we are proxying */
 		new_wsi->protocol_index_for_broadcast_proxy =
 					wsi->protocol_index_for_broadcast_proxy;
-		insert_wsi(context, new_wsi);
 
-		/* add connected socket to internal poll array */
-
-		context->fds[context->fds_count].revents = 0;
-		context->fds[context->fds_count].events = POLLIN;
-		context->fds[context->fds_count++].fd = accept_fd;
-
-		/* external POLL support via protocol 0 */
-		context->protocols[0].callback(context, new_wsi,
-			LWS_CALLBACK_ADD_POLL_FD,
-			(void *)(long)accept_fd, NULL, POLLIN);
-
+		insert_wsi_socket_into_fds(context, new_wsi);
 		break;
 
 bail_prox_listener:
@@ -1126,41 +1078,40 @@
 
 		/* broadcast it to all guys with this protocol index */
 
-		for (n = 0; n < FD_HASHTABLE_MODULUS; n++) {
+		for (n = 0; n < context->fds_count; n++) {
 
-			for (m = 0; m < context->fd_hashtable[n].length; m++) {
+			new_wsi = context->lws_lookup[context->fds[n].fd];
+			if (new_wsi == NULL)
+				continue;
 
-				new_wsi = context->fd_hashtable[n].wsi[m];
+			/* only to clients we are serving to */
 
-				/* only to clients we are serving to */
+			if (new_wsi->mode != LWS_CONNMODE_WS_SERVING)
+				continue;
 
-				if (new_wsi->mode != LWS_CONNMODE_WS_SERVING)
-					continue;
+			/*
+			 * never broadcast to non-established
+			 * connection
+			 */
 
-				/*
-				 * never broadcast to non-established
-				 * connection
-				 */
+			if (new_wsi->state != WSI_STATE_ESTABLISHED)
+				continue;
 
-				if (new_wsi->state != WSI_STATE_ESTABLISHED)
-					continue;
+			/*
+			 * only broadcast to connections using
+			 * the requested protocol
+			 */
 
-				/*
-				 * only broadcast to connections using
-				 * the requested protocol
-				 */
+			if (new_wsi->protocol->protocol_index !=
+				wsi->protocol_index_for_broadcast_proxy)
+				continue;
 
-				if (new_wsi->protocol->protocol_index !=
-					wsi->protocol_index_for_broadcast_proxy)
-					continue;
+			/* broadcast it to this connection */
 
-				/* broadcast it to this connection */
-
-				new_wsi->protocol->callback(context, new_wsi,
-					LWS_CALLBACK_BROADCAST,
-					new_wsi->user_space,
-					buf + LWS_SEND_BUFFER_PRE_PADDING, len);
-			}
+			new_wsi->protocol->callback(context, new_wsi,
+				LWS_CALLBACK_BROADCAST,
+				new_wsi->user_space,
+				buf + LWS_SEND_BUFFER_PRE_PADDING, len);
 		}
 		break;
 
@@ -1303,15 +1254,13 @@
 {
 	int n;
 	int m;
-	struct libwebsocket *wsi;
 	struct libwebsocket_extension *ext;
 
-	for (n = 0; n < FD_HASHTABLE_MODULUS; n++)
-		for (m = 0; m < context->fd_hashtable[n].length; m++) {
-			wsi = context->fd_hashtable[n].wsi[m];
-			libwebsocket_close_and_free_session(context, wsi,
-						    LWS_CLOSE_STATUS_GOINGAWAY);
-		}
+	for (n = 0; n < context->fds_count; n++) {
+		struct libwebsocket *wsi = context->lws_lookup[context->fds[n].fd];
+		libwebsocket_close_and_free_session(context,
+			wsi, LWS_CLOSE_STATUS_GOINGAWAY);
+	}
 
 	/*
 	 * give all extensions a chance to clean up any per-context
@@ -1404,16 +1353,10 @@
 	if (n == 0) /* poll timeout */
 		return 0;
 
-
-
-	if (n < 0) {
-		/*
-		lwsl_err("Listen Socket dead\n");
-		*/
+	if (n < 0)
 		return -1;
-	}
 
-	/* handle accept on listening socket? */
+	/* any socket with events to service? */
 
 	for (n = 0; n < context->fds_count; n++)
 		if (context->fds[n].revents)
@@ -1498,15 +1441,13 @@
 	if (handled)
 		return 1;
 
-	for (n = 0; n < context->fds_count; n++)
-		if (context->fds[n].fd == wsi->sock) {
-			context->fds[n].events |= POLLOUT;
-			n = context->fds_count + 1;
-		}
-
-	if (n == context->fds_count)
+	if (wsi->position_in_fds_table < 0) {
 		lwsl_err("libwebsocket_callback_on_writable: "
 				      "failed to find socket %d\n", wsi->sock);
+		return -1;
+	}
+
+	context->fds[wsi->position_in_fds_table].events |= POLLOUT;
 
 	/* external POLL support via protocol 0 */
 	context->protocols[0].callback(context, wsi,
@@ -1531,18 +1472,14 @@
 {
 	struct libwebsocket_context *context = protocol->owning_server;
 	int n;
-	int m;
 	struct libwebsocket *wsi;
 
-	for (n = 0; n < FD_HASHTABLE_MODULUS; n++) {
-
-		for (m = 0; m < context->fd_hashtable[n].length; m++) {
-
-			wsi = context->fd_hashtable[n].wsi[m];
-
-			if (wsi->protocol == protocol)
-				libwebsocket_callback_on_writable(context, wsi);
-		}
+	for (n = 0; n < context->fds_count; n++) {
+		wsi = context->lws_lookup[context->fds[n].fd];
+		if (!wsi)
+			continue;
+		if (wsi->protocol == protocol)
+			libwebsocket_callback_on_writable(context, wsi);
 	}
 
 	return 0;
@@ -1766,8 +1703,6 @@
 #endif
 
 	lwsl_info("Initial logging level %d\n", log_level);
-	lwsl_info(" FD_HASHTABLE_MODULUS: %u\n", FD_HASHTABLE_MODULUS);
-	lwsl_info(" MAX_CLIENTS: %u\n", MAX_CLIENTS);
 	lwsl_info(" LWS_MAX_HEADER_NAME_LENGTH: %u\n", LWS_MAX_HEADER_NAME_LENGTH);
 	lwsl_info(" LWS_MAX_HEADER_LEN: %u\n", LWS_MAX_HEADER_LEN);
 	lwsl_info(" LWS_INITIAL_HDR_ALLOC: %u\n", LWS_INITIAL_HDR_ALLOC);
@@ -1821,16 +1756,34 @@
 	context->http_proxy_port = 0;
 	context->http_proxy_address[0] = '\0';
 	context->options = options;
+	/* to reduce this allocation, */
+	context->max_fds = getdtablesize();
+	lwsl_info(" max fd tracked: %u\n", context->max_fds);
+
+	context->fds = (struct pollfd *)malloc(sizeof(struct pollfd) * context->max_fds);
+	if (context->fds == NULL) {
+		lwsl_err("Unable to allocate fds array for %d connections\n", context->max_fds);
+		free(context);
+		return NULL;
+	}
+	context->lws_lookup = (struct libwebsocket **)malloc(sizeof(struct libwebsocke *) * context->max_fds);
+	if (context->lws_lookup == NULL) {
+		lwsl_err("Unable to allocate lws_lookup array for %d connections\n", context->max_fds);
+		free(context->fds);
+		free(context);
+		return NULL;
+	}
 	context->fds_count = 0;
 	context->extensions = extensions;
 	context->last_timeout_check_s = 0;
-    context->user_space = user;
+	context->user_space = user;
 
 #ifdef WIN32
 	context->fd_random = 0;
 #else
 	context->fd_random = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY);
 	if (context->fd_random < 0) {
+		free(context);
 		lwsl_err("Unable to open random device %s %d\n",
 				    SYSTEM_RANDOM_FILEPATH, context->fd_random);
 		return NULL;
@@ -2074,11 +2027,6 @@
 	if (lws_b64_selftest())
 		return NULL;
 
-	/* fd hashtable init */
-
-	for (n = 0; n < FD_HASHTABLE_MODULUS; n++)
-		context->fd_hashtable[n].length = 0;
-
 	/* set up our external listening socket we serve on */
 
 	if (port) {
@@ -2126,7 +2074,8 @@
 		wsi->sock = sockfd;
 		wsi->count_active_extensions = 0;
 		wsi->mode = LWS_CONNMODE_SERVER_LISTENER;
-		insert_wsi(context, wsi);
+
+		insert_wsi_socket_into_fds(context, wsi);
 
 		context->listen_service_modulo = LWS_LISTEN_SERVICE_MODULO;
 		context->listen_service_count = 0;
@@ -2134,17 +2083,6 @@
 
 		listen(sockfd, LWS_SOMAXCONN);
 		lwsl_info(" Listening on port %d\n", port);
-
-		/* list in the internal poll array - we're always first */
-
-		context->fds[context->fds_count].fd = sockfd;
-		context->fds[context->fds_count++].events = POLLIN;
-
-		/* external POLL support via protocol 0 */
-		context->protocols[0].callback(context, wsi,
-			LWS_CALLBACK_ADD_POLL_FD,
-			(void *)(long)sockfd, NULL, POLLIN);
-
 	}
 
 	/*
@@ -2226,19 +2164,8 @@
 		/* note which protocol we are proxying */
 		wsi->protocol_index_for_broadcast_proxy =
 						       context->count_protocols;
-		insert_wsi(context, wsi);
 
-		/* list in internal poll array */
-
-		context->fds[context->fds_count].fd = fd;
-		context->fds[context->fds_count].events = POLLIN;
-		context->fds[context->fds_count].revents = 0;
-		context->fds_count++;
-
-		/* external POLL support via protocol 0 */
-		context->protocols[0].callback(context, wsi,
-			LWS_CALLBACK_ADD_POLL_FD,
-			(void *)(long)fd, NULL, POLLIN);
+		insert_wsi_socket_into_fds(context, wsi);
 	}
 
 	/*
@@ -2394,7 +2321,6 @@
 {
 	struct libwebsocket_context *context = protocol->owning_server;
 	int n;
-	int m;
 	struct libwebsocket *wsi;
 
 	if (!protocol->broadcast_socket_user_fd) {
@@ -2410,33 +2336,31 @@
 		 * called in the poll thread context and are serialized.
 		 */
 
-		for (n = 0; n < FD_HASHTABLE_MODULUS; n++) {
+		for (n = 0; n < context->fds_count; n++) {
 
-			for (m = 0; m < context->fd_hashtable[n].length; m++) {
+			wsi = context->lws_lookup[context->fds[n].fd];
+			if (!wsi)
+				continue;
 
-				wsi = context->fd_hashtable[n].wsi[m];
+			if (wsi->mode != LWS_CONNMODE_WS_SERVING)
+				continue;
 
-				if (wsi->mode != LWS_CONNMODE_WS_SERVING)
-					continue;
+			/*
+			 * never broadcast to non-established connections
+			 */
+			if (wsi->state != WSI_STATE_ESTABLISHED)
+				continue;
 
-				/*
-				 * never broadcast to
-				 * non-established connections
-				 */
-				if (wsi->state != WSI_STATE_ESTABLISHED)
-					continue;
+			/* only broadcast to guys using
+			 * requested protocol
+			 */
+			if (wsi->protocol != protocol)
+				continue;
 
-				/* only broadcast to guys using
-				 * requested protocol
-				 */
-				if (wsi->protocol != protocol)
-					continue;
-
-				wsi->protocol->callback(context, wsi,
-					 LWS_CALLBACK_BROADCAST,
-					 wsi->user_space,
-					 buf, len);
-			}
+			wsi->protocol->callback(context, wsi,
+				 LWS_CALLBACK_BROADCAST,
+				 wsi->user_space,
+				 buf, len);
 		}
 
 		return 0;