blob: 616300c6b4b1eaca988c64f722ff79fd1ca51959 [file] [log] [blame]
Andy Green97e36d82018-04-20 07:15:42 +08001/*
2 * lws-minimal-http-server-sse
3 *
4 * Copyright (C) 2018 Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This demonstrates a minimal http server that can serve both normal static
10 * content and server-side event connections.
11 *
12 * To keep it simple, it serves the static stuff from the subdirectory
13 * "./mount-origin" of the directory it was started in.
14 *
15 * You can change that by changing mount.origin below.
16 */
17
18#include <libwebsockets.h>
19#include <string.h>
20#include <stdlib.h>
21#include <signal.h>
22#include <pthread.h>
23
24/* one of these created for each message in the ringbuffer */
25
26struct msg {
27 void *payload; /* is malloc'd */
28 size_t len;
29};
30
31/*
32 * Unlike ws, http is a stateless protocol. This pss only exists for the
33 * duration of a single http transaction. With http/1.1 keep-alive and http/2,
34 * that is unrelated to (shorter than) the lifetime of the network connection.
35 */
36struct pss {
37 struct pss *pss_list;
38 struct lws *wsi;
39 uint32_t tail;
40};
41
42/* one of these is created for each vhost our protocol is used with */
43
44struct vhd {
45 struct lws_context *context;
46 struct lws_vhost *vhost;
47 const struct lws_protocols *protocol;
48
49 struct pss *pss_list; /* linked-list of live pss*/
50 pthread_t pthread_spam[2];
51
52 pthread_mutex_t lock_ring; /* serialize access to the ring buffer */
53 struct lws_ring *ring; /* ringbuffer holding unsent messages */
54 char finished;
55};
56
57static int interrupted;
58
59/* destroys the message when everyone has had a copy of it */
60
61static void
62__minimal_destroy_message(void *_msg)
63{
64 struct msg *msg = _msg;
65
66 free(msg->payload);
67 msg->payload = NULL;
68 msg->len = 0;
69}
70
71/*
72 * This runs under the "spam thread" thread context only.
73 *
74 * We spawn two threads that generate messages with this.
75 *
76 */
77
78static void *
79thread_spam(void *d)
80{
81 struct vhd *vhd = (struct vhd *)d;
82 struct msg amsg;
83 int len = 128, index = 1, n;
84
85 do {
86 /* don't generate output if nobody connected */
87 if (!vhd->pss_list)
88 goto wait;
89
90 pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
91
92 /* only create if space in ringbuffer */
93 n = (int)lws_ring_get_count_free_elements(vhd->ring);
94 if (!n) {
95 lwsl_user("dropping!\n");
96 goto wait_unlock;
97 }
98
99 amsg.payload = malloc(len);
100 if (!amsg.payload) {
101 lwsl_user("OOM: dropping\n");
102 goto wait_unlock;
103 }
104 n = lws_snprintf((char *)amsg.payload, len,
105 "%s: tid: %p, msg: %d", __func__,
106 (void *)pthread_self(), index++);
107 amsg.len = n;
108 n = lws_ring_insert(vhd->ring, &amsg, 1);
109 if (n != 1) {
110 __minimal_destroy_message(&amsg);
111 lwsl_user("dropping!\n");
112 } else
113 /*
114 * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED
115 * in the lws service thread context.
116 */
117 lws_cancel_service(vhd->context);
118
119wait_unlock:
120 pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
121
122wait:
Andy Greende064fd2018-05-03 10:49:36 +0800123 /* rand() would make more sense but coverity shrieks */
124 usleep(100000 + (time(NULL) & 0xffff));
Andy Green97e36d82018-04-20 07:15:42 +0800125
126 } while (!vhd->finished);
127
128 lwsl_notice("thread_spam %p exiting\n", (void *)pthread_self());
129
130 pthread_exit(NULL);
131}
132
133
134static int
135callback_sse(struct lws *wsi, enum lws_callback_reasons reason, void *user,
136 void *in, size_t len)
137{
138 struct pss *pss = (struct pss *)user;
139 struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
140 lws_get_vhost(wsi), lws_get_protocol(wsi));
141 uint8_t buf[LWS_PRE + 256], *start = &buf[LWS_PRE], *p = start,
142 *end = &buf[sizeof(buf) - 1];
143 const struct msg *pmsg;
144 void *retval;
145 int n;
146
147 switch (reason) {
148
149 /* --- vhost protocol lifecycle --- */
150
151 case LWS_CALLBACK_PROTOCOL_INIT:
152 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
153 lws_get_protocol(wsi), sizeof(struct vhd));
154 vhd->context = lws_get_context(wsi);
155 vhd->protocol = lws_get_protocol(wsi);
156 vhd->vhost = lws_get_vhost(wsi);
157
158 vhd->ring = lws_ring_create(sizeof(struct msg), 8,
159 __minimal_destroy_message);
160 if (!vhd->ring)
161 return 1;
162
163 pthread_mutex_init(&vhd->lock_ring, NULL);
164
165 /* start the content-creating threads */
166
167 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
168 if (pthread_create(&vhd->pthread_spam[n], NULL,
169 thread_spam, vhd)) {
170 lwsl_err("thread creation failed\n");
171 goto init_fail;
172 }
173
174 return 0;
175
176 case LWS_CALLBACK_PROTOCOL_DESTROY:
177 init_fail:
178 vhd->finished = 1;
179 for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
180 if (vhd->pthread_spam[n])
181 pthread_join(vhd->pthread_spam[n], &retval);
182
183 if (vhd->ring)
184 lws_ring_destroy(vhd->ring);
185
186 pthread_mutex_destroy(&vhd->lock_ring);
187 return 0;
188
189 /* --- http connection lifecycle --- */
190
191 case LWS_CALLBACK_HTTP:
192 /*
193 * `in` contains the url part after our mountpoint /sse, if any
194 * you can use this to determine what data to return and store
195 * that in the pss
196 */
197 lwsl_info("%s: LWS_CALLBACK_HTTP: '%s'\n", __func__,
198 (const char *)in);
199
200 /* SSE requires a http OK response with this content-type */
201
202 if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
203 "text/event-stream",
204 LWS_ILLEGAL_HTTP_CONTENT_LEN,
205 &p, end))
206 return 1;
207
208 if (lws_finalize_write_http_header(wsi, start, &p, end))
209 return 1;
210
211 /* add ourselves to the list of live pss held in the vhd */
212
213 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
214 pss->tail = lws_ring_get_oldest_tail(vhd->ring);
215 pss->wsi = wsi;
216
217 /* Unlike a normal http connection, we don't want any specific
218 * timeout. We want to stay up until the client drops us */
219
220 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
221
222 /* write the body separately */
223
224 lws_callback_on_writable(wsi);
225
226 return 0;
227
228 case LWS_CALLBACK_CLOSED_HTTP:
229 /* remove our closing pss from the list of live pss */
230
231 lws_ll_fwd_remove(struct pss, pss_list, pss, vhd->pss_list);
232 return 0;
233
234 /* --- data transfer --- */
235
236 case LWS_CALLBACK_HTTP_WRITEABLE:
237
238 lwsl_info("%s: LWS_CALLBACK_HTTP_WRITEABLE\n", __func__);
239
240 pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
241 if (!pmsg)
242 break;
243
244 p += lws_snprintf((char *)p, end - p,
245 "data: %s\x0d\x0a\x0d\x0a",
246 (const char *)pmsg->payload);
247
248 if (lws_write(wsi, (uint8_t *)start, lws_ptr_diff(p, start),
249 LWS_WRITE_HTTP) != lws_ptr_diff(p, start))
250 return 1;
251
252 lws_ring_consume_and_update_oldest_tail(
253 vhd->ring, /* lws_ring object */
254 struct pss, /* type of objects with tails */
255 &pss->tail, /* tail of guy doing the consuming */
256 1, /* number of payload objects being consumed */
257 vhd->pss_list, /* head of list of objects with tails */
258 tail, /* member name of tail in objects with tails */
259 pss_list /* member name of next object in objects with tails */
260 );
261
262 if (lws_ring_get_element(vhd->ring, &pss->tail))
263 /* come back as soon as we can write more */
264 lws_callback_on_writable(pss->wsi);
265
266 return 0;
267
268 case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
269 /*
270 * let everybody know we want to write something on them
271 * as soon as they are ready
272 */
273 lws_start_foreach_llp(struct pss **, ppss, vhd->pss_list) {
274 lws_callback_on_writable((*ppss)->wsi);
275 } lws_end_foreach_llp(ppss, pss_list);
276 return 0;
277
278 default:
279 break;
280 }
281
282 return lws_callback_http_dummy(wsi, reason, user, in, len);
283}
284
285static struct lws_protocols protocols[] = {
286 { "http", lws_callback_http_dummy, 0, 0 },
287 { "sse", callback_sse, sizeof(struct pss), 0 },
288 { NULL, NULL, 0, 0 } /* terminator */
289};
290
291/* override the default mount for /sse in the URL space */
292
293static const struct lws_http_mount mount_sse = {
294 /* .mount_next */ NULL, /* linked-list "next" */
295 /* .mountpoint */ "/sse", /* mountpoint URL */
296 /* .origin */ NULL, /* protocol */
297 /* .def */ NULL,
298 /* .protocol */ "sse",
299 /* .cgienv */ NULL,
300 /* .extra_mimetypes */ NULL,
301 /* .interpret */ NULL,
302 /* .cgi_timeout */ 0,
303 /* .cache_max_age */ 0,
304 /* .auth_mask */ 0,
305 /* .cache_reusable */ 0,
306 /* .cache_revalidate */ 0,
307 /* .cache_intermediaries */ 0,
308 /* .origin_protocol */ LWSMPRO_CALLBACK, /* dynamic */
309 /* .mountpoint_len */ 4, /* char count */
310 /* .basic_auth_login_file */ NULL,
311};
312
313/* default mount serves the URL space from ./mount-origin */
314
315static const struct lws_http_mount mount = {
316 /* .mount_next */ &mount_sse, /* linked-list "next" */
317 /* .mountpoint */ "/", /* mountpoint URL */
318 /* .origin */ "./mount-origin", /* serve from dir */
319 /* .def */ "index.html", /* default filename */
320 /* .protocol */ NULL,
321 /* .cgienv */ NULL,
322 /* .extra_mimetypes */ NULL,
323 /* .interpret */ NULL,
324 /* .cgi_timeout */ 0,
325 /* .cache_max_age */ 0,
326 /* .auth_mask */ 0,
327 /* .cache_reusable */ 0,
328 /* .cache_revalidate */ 0,
329 /* .cache_intermediaries */ 0,
330 /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */
331 /* .mountpoint_len */ 1, /* char count */
332 /* .basic_auth_login_file */ NULL,
333};
334
335void sigint_handler(int sig)
336{
337 interrupted = 1;
338}
339
340int main(int argc, const char **argv)
341{
342 struct lws_context_creation_info info;
343 struct lws_context *context;
344 const char *p;
345 int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
346 /* for LLL_ verbosity above NOTICE to be built into lws,
347 * lws must have been configured and built with
348 * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
349 /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
350 /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
351 /* | LLL_DEBUG */;
352
353 signal(SIGINT, sigint_handler);
354
355 if ((p = lws_cmdline_option(argc, argv, "-d")))
356 logs = atoi(p);
357
358 lws_set_log_level(logs, NULL);
359 lwsl_user("LWS minimal http Server-Side Events + ring | visit http://localhost:7681\n");
360
361 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
362 info.port = 7681;
363 info.protocols = protocols;
364 info.mounts = &mount;
365
366 context = lws_create_context(&info);
367 if (!context) {
368 lwsl_err("lws init failed\n");
369 return 1;
370 }
371
372 while (n >= 0 && !interrupted)
373 n = lws_service(context, 1000);
374
375 lws_context_destroy(context);
376
377 return 0;
378}