autocreate foreign broadcast sockets on broadcast
Also introduce libwebsockets_broadcast_foreign() separate from libwebsockets_broadcast()
Signed-off-by: Andy Green <andy.green@linaro.org>
diff --git a/README.coding b/README.coding
index 0cfb07a..05bc09f 100644
--- a/README.coding
+++ b/README.coding
@@ -25,6 +25,26 @@
libwebsockets will adapt accordingly.
+Procedure for sending data from other threads or process contexts
+-----------------------------------------------------------------
+
+Libwebsockets is carefully designed to work with no blocking in a single thread.
+In some cases where you will add libwebsockets to something else that uses the
+same single thread approach, you can so a safe implementation by combining the
+poll() loops as described in "External Polling loop support" below.
+
+In other cases, you find you have asynchronous events coming from other thread
+or process contexts and there's not much you can do about it. If you just try
+to randomly send, or broadcast using libwebsockets_broadcast() from these other
+places things will blow up either quickly or when the events on the two threads
+interefere with each other. It's not legal to do this.
+
+For those situations, you can use libwebsockets_broadcast_foreign(). This
+serializes the data you're sending using a private, per-protocol socket, so the
+service thread picks it up when it's ready, and it is serviced from the service
+thread context only.
+
+
Fragmented messages
-------------------
@@ -55,6 +75,7 @@
The test app llibwebsockets-test-fraggle sources also show how to
deal with fragmented messages.
+
Debug Logging
-------------
diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c
index 188bb27..b2f4e91 100644
--- a/lib/libwebsockets.c
+++ b/lib/libwebsockets.c
@@ -1974,10 +1974,7 @@
int
libwebsockets_fork_service_loop(struct libwebsocket_context *context)
{
- int fd;
- struct sockaddr_in cli_addr;
int n;
- int p;
n = fork();
if (n < 0)
@@ -1987,33 +1984,6 @@
/* main process context */
- /*
- * set up the proxy sockets to allow broadcast from
- * service process context
- */
-
- for (p = 0; p < context->count_protocols; p++) {
- fd = socket(AF_INET, SOCK_STREAM, 0);
- if (fd < 0) {
- lwsl_err("Unable to create socket\n");
- return -1;
- }
- cli_addr.sin_family = AF_INET;
- cli_addr.sin_port = htons(
- context->protocols[p].broadcast_socket_port);
- cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
- n = connect(fd, (struct sockaddr *)&cli_addr,
- sizeof cli_addr);
- if (n < 0) {
- lwsl_err("Unable to connect to "
- "broadcast socket %d, %s\n",
- n, strerror(errno));
- return -1;
- }
-
- context->protocols[p].broadcast_socket_user_fd = fd;
- }
-
return 0;
}
@@ -2067,8 +2037,8 @@
}
/**
- * libwebsockets_broadcast() - Sends a buffer to the callback for all active
- * connections of the given protocol.
+ * libwebsockets_broadcast() - Sends a buffer to tx callback for all connections of given protocol from single thread
+ *
* @protocol: pointer to the protocol you will broadcast to all members of
* @buf: buffer containing the data to be broadcase. NOTE: this has to be
* allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
@@ -2082,9 +2052,9 @@
* wants to actually send the data for that connection, the callback itself
* should call libwebsocket_write().
*
- * libwebsockets_broadcast() can be called from another fork context without
- * having to take any care about data visibility between the processes, it'll
- * "just work".
+ * This version only works from the same thread / process context as the service
+ * loop. Use libwesockets_broadcast_foreign(...) to do the same job from a different
+ * thread in a safe way.
*/
@@ -2099,53 +2069,84 @@
if (!context)
return 1;
-#ifndef LWS_NO_FORK
- if (!protocol->broadcast_socket_user_fd) {
-#endif
+ /*
+ * We are either running unforked / flat, or we are being
+ * called from poll thread context
+ * eg, from a callback. In that case don't use sockets for
+ * broadcast IPC (since we can't open a socket connection to
+ * a socket listening on our own thread) but directly do the
+ * send action.
+ *
+ * Locking is not needed because we are by definition being
+ * called in the poll thread context and are serialized.
+ */
+
+ for (n = 0; n < context->fds_count; n++) {
+
+ wsi = context->lws_lookup[context->fds[n].fd];
+ if (!wsi)
+ continue;
+
+ if (wsi->mode != LWS_CONNMODE_WS_SERVING)
+ continue;
+
/*
- * We are either running unforked / flat, or we are being
- * called from poll thread context
- * eg, from a callback. In that case don't use sockets for
- * broadcast IPC (since we can't open a socket connection to
- * a socket listening on our own thread) but directly do the
- * send action.
- *
- * Locking is not needed because we are by definition being
- * called in the poll thread context and are serialized.
+ * never broadcast to non-established connections
*/
+ if (wsi->state != WSI_STATE_ESTABLISHED)
+ continue;
- for (n = 0; n < context->fds_count; n++) {
+ /* only broadcast to guys using
+ * requested protocol
+ */
+ if (wsi->protocol != protocol)
+ continue;
- wsi = context->lws_lookup[context->fds[n].fd];
- if (!wsi)
- continue;
-
- if (wsi->mode != LWS_CONNMODE_WS_SERVING)
- continue;
-
- /*
- * never broadcast to non-established connections
- */
- if (wsi->state != WSI_STATE_ESTABLISHED)
- continue;
-
- /* only broadcast to guys using
- * requested protocol
- */
- if (wsi->protocol != protocol)
- continue;
-
- user_callback_handle_rxflow(wsi->protocol->callback,
- context, wsi,
- LWS_CALLBACK_BROADCAST,
- wsi->user_space,
- buf, len);
- }
-
- return 0;
-#ifndef LWS_NO_FORK
+ user_callback_handle_rxflow(wsi->protocol->callback,
+ context, wsi,
+ LWS_CALLBACK_BROADCAST,
+ wsi->user_space,
+ buf, len);
}
+ return 0;
+}
+
+
+#ifndef LWS_NO_FORK
+/**
+ * libwebsockets_broadcast_foreign() - Sends a buffer to the callback for all active
+ * connections of the given protocol.
+ * @protocol: pointer to the protocol you will broadcast to all members of
+ * @buf: buffer containing the data to be broadcase. NOTE: this has to be
+ * allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
+ * the pointer and LWS_SEND_BUFFER_POST_PADDING afterwards in the
+ * case you are calling this function from callback context.
+ * @len: length of payload data in buf, starting from buf.
+ *
+ * This function allows bulk sending of a packet to every connection using
+ * the given protocol. It does not send the data directly; instead it calls
+ * the callback with a reason type of LWS_CALLBACK_BROADCAST. If the callback
+ * wants to actually send the data for that connection, the callback itself
+ * should call libwebsocket_write().
+ *
+ * This ..._foreign() version is designed to be randomly called from other thread or
+ * process contexts than the main libwebsocket service one. A private socket is used
+ * to serialize accesses here with the main service loop.
+ */
+
+int
+libwebsockets_broadcast_foreign(struct libwebsocket_protocols *protocol,
+ unsigned char *buf, size_t len)
+{
+ struct libwebsocket_context *context = protocol->owning_server;
+ int n;
+ int fd;
+ struct sockaddr_in cli_addr;
+
+ if (!context)
+ return 1;
+
/*
* We're being called from a different process context than the server
* loop. Instead of broadcasting directly, we send our
@@ -2156,11 +2157,37 @@
* set up when the websocket server initializes
*/
+
+ /*
+ * autoconnect to this protocol's broadcast proxy socket for this
+ * thread if needed
+ */
+
+ if (protocol->broadcast_socket_user_fd <= 0) {
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd < 0) {
+ lwsl_err("Unable to create socket\n");
+ return -1;
+ }
+ cli_addr.sin_family = AF_INET;
+ cli_addr.sin_port = htons(protocol->broadcast_socket_port);
+ cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
+ n = connect(fd, (struct sockaddr *)&cli_addr, sizeof cli_addr);
+ if (n < 0) {
+ lwsl_err("Unable to connect to "
+ "broadcast socket %d, %s\n",
+ n, strerror(errno));
+ return -1;
+ }
+
+ protocol->broadcast_socket_user_fd = fd;
+ }
+
n = send(protocol->broadcast_socket_user_fd, buf, len, MSG_NOSIGNAL);
return n;
-#endif
}
+#endif
int
libwebsocket_is_final_fragment(struct libwebsocket *wsi)
diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h
index f1395ef..fadbdd1 100644
--- a/lib/libwebsockets.h
+++ b/lib/libwebsockets.h
@@ -790,6 +790,12 @@
libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
unsigned char *buf, size_t len);
+/* notice - you need the pre- and post- padding allocation for buf below */
+
+LWS_EXTERN int
+libwebsockets_broadcast_foreign(struct libwebsocket_protocols *protocol,
+ unsigned char *buf, size_t len);
+
LWS_EXTERN const struct libwebsocket_protocols *
libwebsockets_get_protocol(struct libwebsocket *wsi);
diff --git a/libwebsockets-api-doc.html b/libwebsockets-api-doc.html
index e240eed..156afe7 100644
--- a/libwebsockets-api-doc.html
+++ b/libwebsockets-api-doc.html
@@ -426,7 +426,7 @@
the callback.
</blockquote>
<hr>
-<h2>libwebsockets_broadcast - Sends a buffer to the callback for all active connections of the given protocol.</h2>
+<h2>libwebsockets_broadcast - Sends a buffer to tx callback for all connections of given protocol from single thread</h2>
<i>int</i>
<b>libwebsockets_broadcast</b>
(<i>const struct libwebsocket_protocols *</i> <b>protocol</b>,
@@ -452,9 +452,40 @@
wants to actually send the data for that connection, the callback itself
should call <b>libwebsocket_write</b>.
<p>
-<b>libwebsockets_broadcast</b> can be called from another fork context without
-having to take any care about data visibility between the processes, it'll
-"just work".
+This version only works from the same thread / process context as the service
+loop. Use libwesockets_broadcast_foreign(...) to do the same job from a different
+thread in a safe way.
+</blockquote>
+<hr>
+<h2>libwebsockets_broadcast_foreign - Sends a buffer to the callback for all active connections of the given protocol.</h2>
+<i>int</i>
+<b>libwebsockets_broadcast_foreign</b>
+(<i>struct libwebsocket_protocols *</i> <b>protocol</b>,
+<i>unsigned char *</i> <b>buf</b>,
+<i>size_t</i> <b>len</b>)
+<h3>Arguments</h3>
+<dl>
+<dt><b>protocol</b>
+<dd>pointer to the protocol you will broadcast to all members of
+<dt><b>buf</b>
+<dd>buffer containing the data to be broadcase. NOTE: this has to be
+allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
+the pointer and LWS_SEND_BUFFER_POST_PADDING afterwards in the
+case you are calling this function from callback context.
+<dt><b>len</b>
+<dd>length of payload data in buf, starting from buf.
+</dl>
+<h3>Description</h3>
+<blockquote>
+This function allows bulk sending of a packet to every connection using
+the given protocol. It does not send the data directly; instead it calls
+the callback with a reason type of LWS_CALLBACK_BROADCAST. If the callback
+wants to actually send the data for that connection, the callback itself
+should call <b>libwebsocket_write</b>.
+<p>
+This ...<b>_foreign</b> version is designed to be randomly called from other thread or
+process contexts than the main libwebsocket service one. A private socket is used
+to serialize accesses here with the main service loop.
</blockquote>
<hr>
<h2>lws_confirm_legit_wsi - </h2>
diff --git a/test-server/test-server.c b/test-server/test-server.c
index 4e90b34..2fa0eae 100644
--- a/test-server/test-server.c
+++ b/test-server/test-server.c
@@ -717,7 +717,7 @@
* We take care of pre-and-post padding allocation.
*/
- libwebsockets_broadcast(&protocols[PROTOCOL_DUMB_INCREMENT],
+ libwebsockets_broadcast_foreign(&protocols[PROTOCOL_DUMB_INCREMENT],
&buf[LWS_SEND_BUFFER_PRE_PADDING], 1);
}