autocreate foreign broadcast sockets on broadcast

Also introduce libwebsockets_broadcast_foreign() separate from libwebsockets_broadcast()

Signed-off-by: Andy Green <andy.green@linaro.org>
diff --git a/README.coding b/README.coding
index 0cfb07a..05bc09f 100644
--- a/README.coding
+++ b/README.coding
@@ -25,6 +25,26 @@
 libwebsockets will adapt accordingly.
 
 
+Procedure for sending data from other threads or process contexts
+-----------------------------------------------------------------
+
+Libwebsockets is carefully designed to work with no blocking in a single thread.
+In some cases where you will add libwebsockets to something else that uses the
+same single thread approach, you can so a safe implementation by combining the
+poll() loops as described in "External Polling loop support" below.
+
+In other cases, you find you have asynchronous events coming from other thread
+or process contexts and there's not much you can do about it.  If you just try
+to randomly send, or broadcast using libwebsockets_broadcast() from these other
+places things will blow up either quickly or when the events on the two threads
+interefere with each other.  It's not legal to do this.
+
+For those situations, you can use libwebsockets_broadcast_foreign().  This
+serializes the data you're sending using a private, per-protocol socket, so the
+service thread picks it up when it's ready, and it is serviced from the service
+thread context only.
+
+
 Fragmented messages
 -------------------
 
@@ -55,6 +75,7 @@
 The test app llibwebsockets-test-fraggle sources also show how to
 deal with fragmented messages.
 
+
 Debug Logging
 -------------
 
diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c
index 188bb27..b2f4e91 100644
--- a/lib/libwebsockets.c
+++ b/lib/libwebsockets.c
@@ -1974,10 +1974,7 @@
 int
 libwebsockets_fork_service_loop(struct libwebsocket_context *context)
 {
-	int fd;
-	struct sockaddr_in cli_addr;
 	int n;
-	int p;
 
 	n = fork();
 	if (n < 0)
@@ -1987,33 +1984,6 @@
 
 		/* main process context */
 
-		/*
-		 * set up the proxy sockets to allow broadcast from
-		 * service process context
-		 */
-
-		for (p = 0; p < context->count_protocols; p++) {
-			fd = socket(AF_INET, SOCK_STREAM, 0);
-			if (fd < 0) {
-				lwsl_err("Unable to create socket\n");
-				return -1;
-			}
-			cli_addr.sin_family = AF_INET;
-			cli_addr.sin_port = htons(
-			     context->protocols[p].broadcast_socket_port);
-			cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
-			n = connect(fd, (struct sockaddr *)&cli_addr,
-							       sizeof cli_addr);
-			if (n < 0) {
-				lwsl_err("Unable to connect to "
-						"broadcast socket %d, %s\n",
-						n, strerror(errno));
-				return -1;
-			}
-
-			context->protocols[p].broadcast_socket_user_fd = fd;
-		}
-
 		return 0;
 	}
 
@@ -2067,8 +2037,8 @@
 }
 
 /**
- * libwebsockets_broadcast() - Sends a buffer to the callback for all active
- *				  connections of the given protocol.
+ * libwebsockets_broadcast() - Sends a buffer to tx callback for all connections of given protocol from single thread
+ *
  * @protocol:	pointer to the protocol you will broadcast to all members of
  * @buf:  buffer containing the data to be broadcase.  NOTE: this has to be
  *		allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
@@ -2082,9 +2052,9 @@
  * wants to actually send the data for that connection, the callback itself
  * should call libwebsocket_write().
  *
- * libwebsockets_broadcast() can be called from another fork context without
- * having to take any care about data visibility between the processes, it'll
- * "just work".
+ * This version only works from the same thread / process context as the service
+ * loop.  Use libwesockets_broadcast_foreign(...) to do the same job from a different
+ * thread in a safe way.
  */
 
 
@@ -2099,53 +2069,84 @@
 	if (!context)
 		return 1;
 
-#ifndef LWS_NO_FORK
-	if (!protocol->broadcast_socket_user_fd) {
-#endif
+	/*
+	 * We are either running unforked / flat, or we are being
+	 * called from poll thread context
+	 * eg, from a callback.  In that case don't use sockets for
+	 * broadcast IPC (since we can't open a socket connection to
+	 * a socket listening on our own thread) but directly do the
+	 * send action.
+	 *
+	 * Locking is not needed because we are by definition being
+	 * called in the poll thread context and are serialized.
+	 */
+
+	for (n = 0; n < context->fds_count; n++) {
+
+		wsi = context->lws_lookup[context->fds[n].fd];
+		if (!wsi)
+			continue;
+
+		if (wsi->mode != LWS_CONNMODE_WS_SERVING)
+			continue;
+
 		/*
-		 * We are either running unforked / flat, or we are being
-		 * called from poll thread context
-		 * eg, from a callback.  In that case don't use sockets for
-		 * broadcast IPC (since we can't open a socket connection to
-		 * a socket listening on our own thread) but directly do the
-		 * send action.
-		 *
-		 * Locking is not needed because we are by definition being
-		 * called in the poll thread context and are serialized.
+		 * never broadcast to non-established connections
 		 */
+		if (wsi->state != WSI_STATE_ESTABLISHED)
+			continue;
 
-		for (n = 0; n < context->fds_count; n++) {
+		/* only broadcast to guys using
+		 * requested protocol
+		 */
+		if (wsi->protocol != protocol)
+			continue;
 
-			wsi = context->lws_lookup[context->fds[n].fd];
-			if (!wsi)
-				continue;
-
-			if (wsi->mode != LWS_CONNMODE_WS_SERVING)
-				continue;
-
-			/*
-			 * never broadcast to non-established connections
-			 */
-			if (wsi->state != WSI_STATE_ESTABLISHED)
-				continue;
-
-			/* only broadcast to guys using
-			 * requested protocol
-			 */
-			if (wsi->protocol != protocol)
-				continue;
-
-			user_callback_handle_rxflow(wsi->protocol->callback,
-				 context, wsi,
-				 LWS_CALLBACK_BROADCAST,
-				 wsi->user_space,
-				 buf, len);
-		}
-
-		return 0;
-#ifndef LWS_NO_FORK
+		user_callback_handle_rxflow(wsi->protocol->callback,
+			 context, wsi,
+			 LWS_CALLBACK_BROADCAST,
+			 wsi->user_space,
+			 buf, len);
 	}
 
+	return 0;
+}
+
+
+#ifndef LWS_NO_FORK
+/**
+ * libwebsockets_broadcast_foreign() - Sends a buffer to the callback for all active
+ *				  connections of the given protocol.
+ * @protocol:	pointer to the protocol you will broadcast to all members of
+ * @buf:  buffer containing the data to be broadcase.  NOTE: this has to be
+ *		allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
+ *		the pointer and LWS_SEND_BUFFER_POST_PADDING afterwards in the
+ *		case you are calling this function from callback context.
+ * @len:	length of payload data in buf, starting from buf.
+ *
+ *	This function allows bulk sending of a packet to every connection using
+ * the given protocol.  It does not send the data directly; instead it calls
+ * the callback with a reason type of LWS_CALLBACK_BROADCAST.  If the callback
+ * wants to actually send the data for that connection, the callback itself
+ * should call libwebsocket_write().
+ *
+ * This ..._foreign() version is designed to be randomly called from other thread or
+ * process contexts than the main libwebsocket service one.  A private socket is used
+ * to serialize accesses here with the main service loop. 
+ */
+
+int
+libwebsockets_broadcast_foreign(struct libwebsocket_protocols *protocol,
+						 unsigned char *buf, size_t len)
+{
+	struct libwebsocket_context *context = protocol->owning_server;
+	int n;
+	int fd;
+	struct sockaddr_in cli_addr;
+
+	if (!context)
+		return 1;
+
 	/*
 	 * We're being called from a different process context than the server
 	 * loop.  Instead of broadcasting directly, we send our
@@ -2156,11 +2157,37 @@
 	 * set up when the websocket server initializes
 	 */
 
+
+	/*
+	 * autoconnect to this protocol's broadcast proxy socket for this
+	 * thread if needed
+	 */
+
+	if (protocol->broadcast_socket_user_fd <= 0) {
+		fd = socket(AF_INET, SOCK_STREAM, 0);
+		if (fd < 0) {
+			lwsl_err("Unable to create socket\n");
+			return -1;
+		}
+		cli_addr.sin_family = AF_INET;
+		cli_addr.sin_port = htons(protocol->broadcast_socket_port);
+		cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
+		n = connect(fd, (struct sockaddr *)&cli_addr, sizeof cli_addr);
+		if (n < 0) {
+			lwsl_err("Unable to connect to "
+					"broadcast socket %d, %s\n",
+					n, strerror(errno));
+			return -1;
+		}
+
+		protocol->broadcast_socket_user_fd = fd;
+	}
+
 	n = send(protocol->broadcast_socket_user_fd, buf, len, MSG_NOSIGNAL);
 
 	return n;
-#endif
 }
+#endif
 
 int
 libwebsocket_is_final_fragment(struct libwebsocket *wsi)
diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h
index f1395ef..fadbdd1 100644
--- a/lib/libwebsockets.h
+++ b/lib/libwebsockets.h
@@ -790,6 +790,12 @@
 libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
 						unsigned char *buf, size_t len);
 
+/* notice - you need the pre- and post- padding allocation for buf below */
+
+LWS_EXTERN int
+libwebsockets_broadcast_foreign(struct libwebsocket_protocols *protocol,
+						 unsigned char *buf, size_t len);
+
 LWS_EXTERN const struct libwebsocket_protocols *
 libwebsockets_get_protocol(struct libwebsocket *wsi);
 
diff --git a/libwebsockets-api-doc.html b/libwebsockets-api-doc.html
index e240eed..156afe7 100644
--- a/libwebsockets-api-doc.html
+++ b/libwebsockets-api-doc.html
@@ -426,7 +426,7 @@
 the callback.
 </blockquote>
 <hr>
-<h2>libwebsockets_broadcast - Sends a buffer to the callback for all active connections of the given protocol.</h2>
+<h2>libwebsockets_broadcast - Sends a buffer to tx callback for all connections of given protocol from single thread</h2>
 <i>int</i>
 <b>libwebsockets_broadcast</b>
 (<i>const struct libwebsocket_protocols *</i> <b>protocol</b>,
@@ -452,9 +452,40 @@
 wants to actually send the data for that connection, the callback itself
 should call <b>libwebsocket_write</b>.
 <p>
-<b>libwebsockets_broadcast</b> can be called from another fork context without
-having to take any care about data visibility between the processes, it'll
-"just work".
+This version only works from the same thread / process context as the service
+loop.  Use libwesockets_broadcast_foreign(...) to do the same job from a different
+thread in a safe way.
+</blockquote>
+<hr>
+<h2>libwebsockets_broadcast_foreign - Sends a buffer to the callback for all active connections of the given protocol.</h2>
+<i>int</i>
+<b>libwebsockets_broadcast_foreign</b>
+(<i>struct libwebsocket_protocols *</i> <b>protocol</b>,
+<i>unsigned char *</i> <b>buf</b>,
+<i>size_t</i> <b>len</b>)
+<h3>Arguments</h3>
+<dl>
+<dt><b>protocol</b>
+<dd>pointer to the protocol you will broadcast to all members of
+<dt><b>buf</b>
+<dd>buffer containing the data to be broadcase.  NOTE: this has to be
+allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
+the pointer and LWS_SEND_BUFFER_POST_PADDING afterwards in the
+case you are calling this function from callback context.
+<dt><b>len</b>
+<dd>length of payload data in buf, starting from buf.
+</dl>
+<h3>Description</h3>
+<blockquote>
+This function allows bulk sending of a packet to every connection using
+the given protocol.  It does not send the data directly; instead it calls
+the callback with a reason type of LWS_CALLBACK_BROADCAST.  If the callback
+wants to actually send the data for that connection, the callback itself
+should call <b>libwebsocket_write</b>.
+<p>
+This ...<b>_foreign</b> version is designed to be randomly called from other thread or
+process contexts than the main libwebsocket service one.  A private socket is used
+to serialize accesses here with the main service loop. 
 </blockquote>
 <hr>
 <h2>lws_confirm_legit_wsi - </h2>
diff --git a/test-server/test-server.c b/test-server/test-server.c
index 4e90b34..2fa0eae 100644
--- a/test-server/test-server.c
+++ b/test-server/test-server.c
@@ -717,7 +717,7 @@
 		 * We take care of pre-and-post padding allocation.
 		 */
 
-		libwebsockets_broadcast(&protocols[PROTOCOL_DUMB_INCREMENT],
+		libwebsockets_broadcast_foreign(&protocols[PROTOCOL_DUMB_INCREMENT],
 					&buf[LWS_SEND_BUFFER_PRE_PADDING], 1);
 	}