blob: bddd5c920e0af43429c3e463899e339c4b01bebb [file] [log] [blame]
Jouni Malinencd4e3c32015-10-29 12:39:56 +02001/*
2 * Sigma Control API DUT (station/AP)
3 * Copyright (c) 2010, Atheros Communications, Inc.
4 * Copyright (c) 2011-2015, Qualcomm Atheros, Inc.
5 * All Rights Reserved.
6 * Licensed under the Clear BSD license. See README for more details.
7 */
8
9#include "sigma_dut.h"
10
11#define TG_MAX_CLIENTS_CONNECTIONS 1
12
13
14static int cmd_traffic_agent_config(struct sigma_dut *dut,
15 struct sigma_conn *conn,
16 struct sigma_cmd *cmd)
17{
18 struct sigma_stream *s;
19 const char *val;
20 char buf[100];
21
22 if (dut->num_streams == MAX_SIGMA_STREAMS) {
23 send_resp(dut, conn, SIGMA_ERROR, "errorCode,No more "
24 "concurrent traffic streams supported");
25 return 0;
26 }
27
28 s = &dut->streams[dut->num_streams];
29 free(s->stats);
30 memset(s, 0, sizeof(*s));
31 s->sock = -1;
32 s->no_timestamps = dut->no_timestamps;
33
34 val = get_param(cmd, "profile");
35 if (!val)
36 return -1;
37
38 if (strcasecmp(val, "File_Transfer") == 0)
39 s->profile = SIGMA_PROFILE_FILE_TRANSFER;
40 else if (strcasecmp(val, "Multicast") == 0)
41 s->profile = SIGMA_PROFILE_MULTICAST;
42 else if (strcasecmp(val, "IPTV") == 0)
43 s->profile = SIGMA_PROFILE_IPTV;
44 else if (strcasecmp(val, "Transaction") == 0)
45 s->profile = SIGMA_PROFILE_TRANSACTION;
46 else if (strcasecmp(val, "Start_Sync") == 0)
47 s->profile = SIGMA_PROFILE_START_SYNC;
48 else if (strcasecmp(val, "Uapsd") == 0)
49 s->profile = SIGMA_PROFILE_UAPSD;
50 else {
51 send_resp(dut, conn, SIGMA_INVALID, "errorCode,Unsupported "
52 "profile");
53 return 0;
54 }
55
56 val = get_param(cmd, "direction");
57 if (!val)
58 return -1;
59 if (strcasecmp(val, "send") == 0)
60 s->sender = 1;
61 else if (strcasecmp(val, "receive") == 0)
62 s->sender = 0;
63 else
64 return -1;
65
66 val = get_param(cmd, "destination");
67 if (val) {
68 if (inet_aton(val, &s->dst) == 0)
69 return -1;
70 }
71
72 val = get_param(cmd, "source");
73 if (val) {
74 if (inet_aton(val, &s->src) == 0)
75 return -1;
76 }
77
78 val = get_param(cmd, "destinationPort");
79 if (val)
80 s->dst_port = atoi(val);
81
82 val = get_param(cmd, "sourcePort");
83 if (val)
84 s->src_port = atoi(val);
85
86 val = get_param(cmd, "frameRate");
87 if (val)
88 s->frame_rate = atoi(val);
89
90 val = get_param(cmd, "duration");
91 if (val)
92 s->duration = atoi(val);
93
94 val = get_param(cmd, "payloadSize");
95 if (val)
96 s->payload_size = atoi(val);
97
98 val = get_param(cmd, "startDelay");
99 if (val)
100 s->start_delay = atoi(val);
101
102 val = get_param(cmd, "maxCnt");
103 if (val)
104 s->max_cnt = atoi(val);
105
106 val = get_param(cmd, "trafficClass");
107 if (val) {
108 if (strcasecmp(val, "Voice") == 0)
109 s->tc = SIGMA_TC_VOICE;
110 else if (strcasecmp(val, "Video") == 0)
111 s->tc = SIGMA_TC_VIDEO;
112 else if (strcasecmp(val, "Background") == 0)
113 s->tc = SIGMA_TC_BACKGROUND;
114 else if (strcasecmp(val, "BestEffort") == 0)
115 s->tc = SIGMA_TC_BEST_EFFORT;
116 else
117 return -1;
118 }
119
120 val = get_param(cmd, "userpriority");
121 if (val) {
122 s->user_priority_set = 1;
123 s->user_priority = atoi(val);
124 }
125
Pradeep Reddy POTTETI79594042015-11-23 12:59:12 +0530126 val = get_param(cmd, "tagName");
127 if (val) {
128 strncpy(s->test_name, val, sizeof(s->test_name));
129 s->test_name[sizeof(s->test_name) - 1] = '\0';
130 sigma_dut_print(dut, DUT_MSG_DEBUG,
131 "Traffic agent: U-APSD console tagname %s",
132 s->test_name);
133 }
134
Jouni Malinencd4e3c32015-10-29 12:39:56 +0200135 if (dut->throughput_pktsize && s->frame_rate == 0 && s->sender &&
136 dut->throughput_pktsize != s->payload_size &&
137 (s->profile == SIGMA_PROFILE_FILE_TRANSFER ||
138 s->profile == SIGMA_PROFILE_IPTV ||
139 s->profile == SIGMA_PROFILE_UAPSD)) {
140 sigma_dut_print(dut, DUT_MSG_INFO, "Traffic agent: Override "
141 "throughput test payload size %d -> %d",
142 s->payload_size, dut->throughput_pktsize);
143 s->payload_size = dut->throughput_pktsize;
144 }
145
146 val = get_param(cmd, "transProtoType");
147 if (val) {
148 if (strcmp(val, "1") == 0)
149 s->trans_proto = IPPROTO_TCP;
150 else if (strcmp(val, "0") == 0)
151 s->trans_proto = IPPROTO_UDP;
152 else
153 return -1;
154 } else {
155 s->trans_proto = IPPROTO_UDP;
156 }
157
158 if (s->profile == SIGMA_PROFILE_IPTV && !s->sender && !s->no_timestamps)
159 {
160 s->stats = calloc(MAX_SIGMA_STATS,
161 sizeof(struct sigma_frame_stats));
162 if (s->stats == NULL)
163 return -1;
164 }
165
166 dut->stream_id++;
167 dut->num_streams++;
168
169 s->stream_id = dut->stream_id;
170 snprintf(buf, sizeof(buf), "streamID,%d", s->stream_id);
171 send_resp(dut, conn, SIGMA_COMPLETE, buf);
172 return 0;
173}
174
175
176static void stop_stream(struct sigma_stream *s)
177{
178 if (s && s->started) {
179 pthread_join(s->thr, NULL);
180 if (s->sock != -1) {
181 close(s->sock);
182 s->sock = -1;
183 }
184
185 s->started = 0;
186 }
187}
188
189
190static int cmd_traffic_agent_reset(struct sigma_dut *dut,
191 struct sigma_conn *conn,
192 struct sigma_cmd *cmd)
193{
194 int i;
195 for (i = 0; i < dut->num_streams; i++) {
196 struct sigma_stream *s = &dut->streams[i];
197 s->stop = 1;
198 stop_stream(s);
199 }
200 dut->num_streams = 0;
201 memset(&dut->streams, 0, sizeof(dut->streams));
202 return 1;
203}
204
205
206static int get_stream_id(const char *str, int streams[MAX_SIGMA_STREAMS])
207{
208 int count;
209
210 count = 0;
211 for (;;) {
212 if (count == MAX_SIGMA_STREAMS)
213 return -1;
214 streams[count] = atoi(str);
215 if (streams[count] == 0)
216 return -1;
217 count++;
218 str = strchr(str, ' ');
219 if (str == NULL)
220 break;
221 while (*str == ' ')
222 str++;
223 }
224
225 return count;
226}
227
228
229static int open_socket_file_transfer(struct sigma_dut *dut,
230 struct sigma_stream *s)
231{
232 struct sockaddr_in addr;
233 int sock_opt_val = 1;
234
235 s->sock = socket(PF_INET, IPPROTO_UDP == s->trans_proto ? SOCK_DGRAM :
236 SOCK_STREAM, s->trans_proto);
237 if (s->sock < 0) {
238 perror("socket");
239 return -1;
240 }
241
242 if (setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR, &sock_opt_val,
243 sizeof(sock_opt_val)) < 0) {
244 perror("setsockopt");
245 close(s->sock);
246 s->sock = -1;
247 return -1;
248 }
249
250 memset(&addr, 0, sizeof(addr));
251 addr.sin_family = AF_INET;
252 addr.sin_port = htons(s->sender ? s->src_port : s->dst_port);
253 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: sender=%d "
254 "bind port %d", s->sender, ntohs(addr.sin_port));
255 if (bind(s->sock, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
256 perror("bind");
257 close(s->sock);
258 s->sock = -1;
259 return -1;
260 }
261
262 if (s->profile == SIGMA_PROFILE_MULTICAST && !s->sender)
263 return 0;
264
265 if (s->trans_proto == IPPROTO_TCP && s->sender == 0) {
266 if (listen(s->sock, TG_MAX_CLIENTS_CONNECTIONS ) < 0) {
267 sigma_dut_print(dut, DUT_MSG_INFO,
268 "Listen failed with error %d: %s",
269 errno, strerror(errno));
270 close(s->sock);
271 s->sock = -1;
272 return -1;
273 }
274 } else {
275 memset(&addr, 0, sizeof(addr));
276 addr.sin_family = AF_INET;
277 addr.sin_addr.s_addr = s->sender ? s->dst.s_addr :
278 s->src.s_addr;
279 addr.sin_port = htons(s->sender ? s->dst_port : s->src_port);
280 sigma_dut_print(dut, DUT_MSG_DEBUG,
281 "Traffic agent: connect %s:%d",
282 inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
283 if (connect(s->sock, (struct sockaddr *) &addr, sizeof(addr)) <
284 0) {
285 perror("connect");
286 close(s->sock);
287 s->sock = -1;
288 return -1;
289 }
290 }
291
292 return 0;
293}
294
295
296static int open_socket_multicast(struct sigma_dut *dut, struct sigma_stream *s)
297{
298 if (open_socket_file_transfer(dut, s) < 0)
299 return -1;
300
301 if (!s->sender) {
302 struct ip_mreq mr;
303 memset(&mr, 0, sizeof(mr));
304 mr.imr_multiaddr.s_addr = s->dst.s_addr;
305 mr.imr_interface.s_addr = htonl(INADDR_ANY);
306 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: "
307 "IP_ADD_MEMBERSHIP %s", inet_ntoa(s->dst));
308 if (setsockopt(s->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
309 (void *) &mr, sizeof(mr)) < 0) {
310 sigma_dut_print(dut, DUT_MSG_INFO,
311 "setsockopt[IP_ADD_MEMBERSHIP]: %s",
312 strerror(errno));
313 /*
314 * Continue anyway since this can happen, e.g., if the
315 * default route is missing. This is not critical for
316 * multicast RX testing.
317 */
318 }
319 }
320
321 return 0;
322}
323
324
325static int set_socket_prio(struct sigma_stream *s)
326{
327 int tos = 0x00;
328
329 switch (s->tc) {
330 case SIGMA_TC_VOICE:
331 if (s->user_priority_set) {
332 if (s->user_priority == 6)
333 tos = 48 << 2;
334 else if (s->user_priority == 7)
335 tos = 56 << 2;
336 else
337 return -1;
338 } else
339 tos = 0xe0; /* DSCP = 56 */
340 break;
341 case SIGMA_TC_VIDEO:
342 if (s->user_priority_set) {
343 if (s->user_priority == 4)
344 tos = 32 << 2;
345 else if (s->user_priority == 5)
346 tos = 40 << 2;
347 else
348 return -1;
349 } else
350 tos = 0xa0; /* DSCP = 40 */
351 break;
352 case SIGMA_TC_BACKGROUND:
353 if (s->user_priority_set) {
354 if (s->user_priority == 1)
355 tos = 8 << 2;
356 else if (s->user_priority == 2)
357 tos = 16 << 2;
358 else
359 return -1;
360 } else
361 tos = 0x20; /* DSCP = 8 */
362 break;
363 case SIGMA_TC_BEST_EFFORT:
364 if (s->user_priority_set) {
365 if (s->user_priority == 0)
366 tos = 0 << 2;
367 else if (s->user_priority == 3)
368 tos = 20 << 2;
369 else
370 return -1;
371 } else
372 tos = 0x00; /* DSCP = 0 */
373 break;
374 }
375
376 if (setsockopt(s->sock, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)) < 0) {
377 perror("setsockopt");
378 return -1;
379 }
380
381 return 0;
382}
383
384
385static int open_socket(struct sigma_dut *dut, struct sigma_stream *s)
386{
387 switch (s->profile) {
388 case SIGMA_PROFILE_FILE_TRANSFER:
389 return open_socket_file_transfer(dut, s);
390 case SIGMA_PROFILE_MULTICAST:
391 return open_socket_multicast(dut, s);
392 case SIGMA_PROFILE_IPTV:
393 if (open_socket_file_transfer(dut, s) < 0)
394 return -1;
395 return set_socket_prio(s);
396 case SIGMA_PROFILE_TRANSACTION:
397 return open_socket_file_transfer(dut, s);
398 case SIGMA_PROFILE_UAPSD:
399 return open_socket_file_transfer(dut, s);
400 case SIGMA_PROFILE_START_SYNC:
401 sigma_dut_print(dut, DUT_MSG_INFO, "Traffic stream profile %d "
402 "not yet supported", s->profile);
403 /* TODO */
404 break;
405 }
406
407 return -1;
408}
409
410
411static void send_file_fast(struct sigma_stream *s, char *pkt)
412{
413 struct timeval stop, now;
414 int res;
415 unsigned int counter = 0;
416
417 gettimeofday(&stop, NULL);
418 stop.tv_sec += s->duration;
419
420 while (!s->stop) {
421 counter++;
422 WPA_PUT_BE32(&pkt[8], counter);
423
424 if ((counter & 0xf) == 0) {
425 gettimeofday(&now, NULL);
426 if (now.tv_sec > stop.tv_sec ||
427 (now.tv_sec == stop.tv_sec &&
428 now.tv_usec >= stop.tv_usec))
429 break;
430 }
431
432 s->tx_act_frames++;
433 res = send(s->sock, pkt, s->payload_size, 0);
434 if (res >= 0) {
435 s->tx_frames++;
436 s->tx_payload_bytes += res;
437 } else {
438 switch (errno) {
439 case EAGAIN:
440 case ENOBUFS:
441 usleep(1000);
442 break;
443 case ECONNRESET:
444 case EPIPE:
445 s->stop = 1;
446 break;
447 default:
448 perror("send");
449 break;
450 }
451 }
452 }
453}
454
455
456static void send_file(struct sigma_stream *s)
457{
458 char *pkt;
459 struct timeval stop, now, start;
460 int res;
461 unsigned int counter = 0;
462 int sleep_time, sleep_usec, pkt_spacing, duration = 0;
463
464 if (s->duration <= 0 || s->frame_rate < 0 || s->payload_size < 20)
465 return;
466
467 pkt = malloc(s->payload_size);
468 if (pkt == NULL)
469 return;
470 memset(pkt, 1, s->payload_size);
471 strncpy(pkt, "1345678", s->payload_size);
472
473 if (s->frame_rate == 0 && s->no_timestamps) {
474 send_file_fast(s, pkt);
475 free(pkt);
476 return;
477 }
478
479 gettimeofday(&stop, NULL);
480 stop.tv_sec += s->duration;
481
482 if (s->frame_rate == 0) {
483 sleep_time = 0;
484 pkt_spacing = 0;
485 } else {
486 sleep_time = 1000000 / s->frame_rate;
487 pkt_spacing = sleep_time;
488 sleep_time -= 100;
489 if (sleep_time < 0)
490 sleep_time = 0;
491 }
492 sleep_usec = sleep_time;
493
494 gettimeofday(&start, NULL);
495
496 while (!s->stop) {
497 counter++;
498 WPA_PUT_BE32(&pkt[8], counter);
499
500 if (sleep_usec)
501 usleep(sleep_usec);
502
503 gettimeofday(&now, NULL);
504 if (now.tv_sec > stop.tv_sec ||
505 (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec))
506 break;
507
508 if (sleep_time) {
509 /* Update sleep time based on current time */
510 struct timeval tmp;
511 int diff;
512
513 duration += pkt_spacing;
514 timersub(&now, &start, &tmp);
515 diff = tmp.tv_sec * 1000000 + tmp.tv_usec;
516
517 if (diff > duration) {
518 sleep_usec = pkt_spacing - (diff - duration);
519 sleep_usec -= 100;
520 } else {
521 sleep_usec = pkt_spacing - 100;
522 }
523 if (sleep_usec < 0)
524 sleep_usec = 0;
525 }
526
527 WPA_PUT_BE32(&pkt[12], now.tv_sec);
528 WPA_PUT_BE32(&pkt[16], now.tv_usec);
529
530 s->tx_act_frames++;
531 res = send(s->sock, pkt, s->payload_size, 0);
532 if (res >= 0) {
533 s->tx_frames++;
534 s->tx_payload_bytes += res;
535 } else {
536 switch (errno) {
537 case EAGAIN:
538 case ENOBUFS:
539 usleep(1000);
540 break;
541 case ECONNRESET:
542 case EPIPE:
543 s->stop = 1;
544 break;
545 default:
546 perror("send");
547 break;
548 }
549 }
550 }
551
552 free(pkt);
553}
554
555
556static void send_transaction(struct sigma_stream *s)
557{
558 char *pkt, *rpkt;
559 struct timeval stop, now;
560 int res;
561 unsigned int counter = 0, rcounter;
562 int wait_time;
563 fd_set rfds;
564 struct timeval tv;
565
566 if (s->duration <= 0 || s->frame_rate <= 0 || s->payload_size < 20)
567 return;
568
569 pkt = malloc(s->payload_size);
570 if (pkt == NULL)
571 return;
572 rpkt = malloc(s->payload_size);
573 if (rpkt == NULL) {
574 free(pkt);
575 return;
576 }
577 memset(pkt, 1, s->payload_size);
578 strncpy(pkt, "1345678", s->payload_size);
579
580 gettimeofday(&stop, NULL);
581 stop.tv_sec += s->duration;
582
583 wait_time = 1000000 / s->frame_rate;
584
585 while (!s->stop) {
586 counter++;
587 if (s->max_cnt && (int) counter > s->max_cnt)
588 break;
589 WPA_PUT_BE32(&pkt[8], counter);
590
591 gettimeofday(&now, NULL);
592 if (now.tv_sec > stop.tv_sec ||
593 (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec))
594 break;
595 WPA_PUT_BE32(&pkt[12], now.tv_sec);
596 WPA_PUT_BE32(&pkt[16], now.tv_usec);
597
598 res = send(s->sock, pkt, s->payload_size, 0);
599 if (res >= 0) {
600 s->tx_frames++;
601 s->tx_payload_bytes += res;
602 } else {
603 switch (errno) {
604 case EAGAIN:
605 case ENOBUFS:
606 usleep(1000);
607 break;
608 case ECONNRESET:
609 case EPIPE:
610 s->stop = 1;
611 break;
612 default:
613 perror("send");
614 break;
615 }
616 }
617
618 /* Wait for response */
619 tv.tv_sec = 0;
620 tv.tv_usec = wait_time;
621 FD_ZERO(&rfds);
622 FD_SET(s->sock, &rfds);
623 res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
624 if (res < 0) {
625 if (errno == EINTR)
626 continue;
627 perror("select");
628 break;
629 }
630
631 if (res == 0) {
632 /* timeout */
633 continue;
634 }
635
636 if (FD_ISSET(s->sock, &rfds)) {
637 /* response received */
638 res = recv(s->sock, rpkt, s->payload_size, 0);
639 if (res < 0) {
640 perror("recv");
641 break;
642 }
643 rcounter = WPA_GET_BE32(&rpkt[8]);
644 if (rcounter != counter)
645 s->out_of_seq_frames++;
646 s->rx_frames++;
647 s->rx_payload_bytes += res;
648 }
649 }
650
651 free(pkt);
652 free(rpkt);
653}
654
655
656static void * send_thread(void *ctx)
657{
658 struct sigma_stream *s = ctx;
659
660 sleep(s->start_delay);
661
662 switch (s->profile) {
663 case SIGMA_PROFILE_FILE_TRANSFER:
664 send_file(s);
665 break;
666 case SIGMA_PROFILE_MULTICAST:
667 send_file(s);
668 break;
669 case SIGMA_PROFILE_IPTV:
670 send_file(s);
671 break;
672 case SIGMA_PROFILE_TRANSACTION:
673 send_transaction(s);
674 break;
675 case SIGMA_PROFILE_START_SYNC:
676 break;
677 case SIGMA_PROFILE_UAPSD:
Pradeep Reddy POTTETI79594042015-11-23 12:59:12 +0530678 send_uapsd_console(s);
Jouni Malinencd4e3c32015-10-29 12:39:56 +0200679 break;
680 }
681
682 return NULL;
683}
684
685
686struct traffic_agent_send_data {
687 struct sigma_dut *dut;
688 struct sigma_conn *conn;
689 int streams[MAX_SIGMA_STREAMS];
690 int count;
691};
692
693
694static struct sigma_stream * get_stream(struct sigma_dut *dut, int id)
695{
696 int i;
697
698 for (i = 0; i < dut->num_streams; i++) {
699 if ((unsigned int) id == dut->streams[i].stream_id)
700 return &dut->streams[i];
701 }
702
703 return NULL;
704}
705
706
707static void * send_report_thread(void *ctx)
708{
709 struct traffic_agent_send_data *data = ctx;
710 struct sigma_dut *dut = data->dut;
711 struct sigma_conn *conn = data->conn;
712 int i, ret;
713 char buf[100 + MAX_SIGMA_STREAMS * 60], *pos;
714
715 for (i = 0; i < data->count; i++) {
716 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: waiting "
717 "for stream %d send to complete",
718 data->streams[i]);
719 stop_stream(get_stream(dut, data->streams[i]));
720 }
721
722 buf[0] = '\0';
723 pos = buf;
724
725 pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,");
726 for (i = 0; i < data->count; i++) {
727 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
728 i > 0 ? " " : "", data->streams[i]);
729 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
730 break;
731 pos += ret;
732 }
733
734 if (dut->program == PROGRAM_60GHZ) {
735 sigma_dut_print(dut, DUT_MSG_INFO, "reporting tx_act_frames");
736 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,");
737 for (i = 0; i < data->count; i++) {
738 struct sigma_stream *s;
739
740 s = get_stream(dut, data->streams[i]);
741 if (!s)
742 continue;
743 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
744 i > 0 ? " " : "", s->tx_act_frames);
745 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
746 break;
747 pos += ret;
748 }
749 }
750
751 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,");
752 for (i = 0; i < data->count; i++) {
753 struct sigma_stream *s = get_stream(dut, data->streams[i]);
754
755 if (!s)
756 continue;
757 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
758 i > 0 ? " " : "", s->tx_frames);
759 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
760 break;
761 pos += ret;
762 }
763
764 pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,");
765 for (i = 0; i < data->count; i++) {
766 struct sigma_stream *s = get_stream(dut, data->streams[i]);
767
768 if (!s)
769 continue;
770 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
771 i > 0 ? " " : "", s->rx_frames);
772 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
773 break;
774 pos += ret;
775 }
776
777 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,");
778 for (i = 0; i < data->count; i++) {
779 struct sigma_stream *s = get_stream(dut, data->streams[i]);
780
781 if (!s)
782 continue;
783 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
784 i > 0 ? " " : "", s->tx_payload_bytes);
785 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
786 break;
787 pos += ret;
788 }
789
790 pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,");
791 for (i = 0; i < data->count; i++) {
792 struct sigma_stream *s = get_stream(dut, data->streams[i]);
793
794 if (!s)
795 continue;
796 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
797 i > 0 ? " " : "", s->rx_payload_bytes);
798 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
799 break;
800 pos += ret;
801 }
802
803 pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,");
804 for (i = 0; i < data->count; i++) {
805 struct sigma_stream *s = get_stream(dut, data->streams[i]);
806
807 if (!s)
808 continue;
809 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
810 i > 0 ? " " : "", s->out_of_seq_frames);
811 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
812 break;
813 pos += ret;
814 }
815
816 for (i = 0; i < data->count; i++) {
817 struct sigma_stream *s = get_stream(dut, data->streams[i]);
818 if (!s)
819 continue;
820 s->ta_send_in_progress = 0;
821 if (s->trans_proto == IPPROTO_TCP) {
822 /*
823 * Close the socket to make sure client side close the
824 * network before the server. Otherwise, the server
825 * might get "Address already in use" when trying to
826 * reuse the port.
827 */
828 close(s->sock);
829 s->sock = -1;
830 sigma_dut_print(dut, DUT_MSG_DEBUG,
831 "Closed the sender socket");
832 }
833 }
834
835 buf[sizeof(buf) - 1] = '\0';
836
837 if (conn->s < 0)
838 sigma_dut_print(dut, DUT_MSG_INFO, "Cannot send traffic_agent response since control socket has already been closed");
839 else
840 send_resp(dut, conn, SIGMA_COMPLETE, buf);
841 conn->waiting_completion = 0;
842
843 free(data);
844
845 return NULL;
846}
847
848
849static int cmd_traffic_agent_send(struct sigma_dut *dut,
850 struct sigma_conn *conn,
851 struct sigma_cmd *cmd)
852{
853 const char *val;
854 int i, j, res;
855 char buf[100];
856 struct traffic_agent_send_data *data;
857
858 val = get_param(cmd, "streamID");
859 if (val == NULL)
860 return -1;
861
862 data = calloc(1, sizeof(*data));
863 if (data == NULL)
864 return -1;
865 data->dut = dut;
866 data->conn = conn;
867
868 data->count = get_stream_id(val, data->streams);
869 if (data->count < 0) {
870 free(data);
871 return -1;
872 }
873 for (i = 0; i < data->count; i++) {
874 struct sigma_stream *s = get_stream(dut, data->streams[i]);
875
876 if (!s) {
877 snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
878 "not configured", data->streams[i]);
879 send_resp(dut, conn, SIGMA_INVALID, buf);
880 free(data);
881 return 0;
882 }
883 for (j = 0; j < i; j++)
884 if (data->streams[i] == data->streams[j])
885 return -1;
886 if (!s->sender) {
887 snprintf(buf, sizeof(buf), "errorCode,Not configured "
888 "as sender for streamID %d", data->streams[i]);
889 send_resp(dut, conn, SIGMA_INVALID, buf);
890 free(data);
891 return 0;
892 }
893 if (s->ta_send_in_progress) {
894 send_resp(dut, conn, SIGMA_ERROR,
895 "errorCode,Multiple concurrent send cmds on same streamID not supported");
896 free(data);
897 return 0;
898 }
899 }
900
901 for (i = 0; i < data->count; i++) {
902 struct sigma_stream *s = get_stream(dut, data->streams[i]);
903
904 if (!s)
905 continue;
906 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open "
907 "socket for send stream %d", data->streams[i]);
908 if (open_socket(dut, s) < 0) {
909 free(data);
910 return -2;
911 }
912 }
913
914 for (i = 0; i < data->count; i++) {
915 struct sigma_stream *s = get_stream(dut, data->streams[i]);
916
917 if (!s)
918 continue;
Pradeep Reddy POTTETI79594042015-11-23 12:59:12 +0530919
920 /*
921 * Provide dut context to the thread to support debugging and
922 * returning of error messages.
923 */
924 s->dut = dut;
925
Jouni Malinencd4e3c32015-10-29 12:39:56 +0200926 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start "
927 "send for stream %d", data->streams[i]);
928 res = pthread_create(&s->thr, NULL, send_thread, s);
929 if (res) {
930 sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create "
931 "failed: %d", res);
932 free(data);
933 return -2;
934 }
935 s->started = 1;
936 }
937
938 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start a thread to track sending streams");
939 conn->waiting_completion = 1;
940 res = pthread_create(&dut->thr, NULL, send_report_thread, data);
941 if (res) {
942 sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create failed: %d",
943 res);
944 free(data);
945 conn->waiting_completion = 0;
946 return -2;
947 }
948
949 for (i = 0; i < data->count; i++) {
950 struct sigma_stream *s = get_stream(dut, data->streams[i]);
951
952 if (s)
953 s->ta_send_in_progress = 1;
954 }
955
956 /* Command will be completed in send_report_thread() */
957
958 return 0;
959}
960
961
962static void receive_file(struct sigma_stream *s)
963{
964 struct timeval tv, now;
965 fd_set rfds;
966 int res;
967 char *pkt;
968 int pktlen;
969 unsigned int last_rx = 0, counter;
970
971 pktlen = 65536 + 1;
972 pkt = malloc(pktlen);
973 if (pkt == NULL)
974 return;
975
976 while (!s->stop) {
977 FD_ZERO(&rfds);
978 FD_SET(s->sock, &rfds);
979 tv.tv_sec = 0;
980 tv.tv_usec = 300000;
981 res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
982 if (res < 0) {
983 perror("select");
984 usleep(10000);
985 } else if (FD_ISSET(s->sock, &rfds)) {
986 res = recv(s->sock, pkt, pktlen, 0);
987 if (res >= 0) {
988 s->rx_frames++;
989 s->rx_payload_bytes += res;
990
991 counter = WPA_GET_BE32(&pkt[8]);
992 if (counter < last_rx)
993 s->out_of_seq_frames++;
994 last_rx = counter;
995 } else {
996 perror("recv");
997 break;
998 }
999
1000 if (res >= 20 && s->stats &&
1001 s->num_stats < MAX_SIGMA_STATS) {
1002 struct sigma_frame_stats *stats;
1003 stats = &s->stats[s->num_stats];
1004 s->num_stats++;
1005 gettimeofday(&now, NULL);
1006 stats->seqnum = counter;
1007 stats->local_sec = now.tv_sec;
1008 stats->local_usec = now.tv_usec;
1009 stats->remote_sec = WPA_GET_BE32(&pkt[12]);
1010 stats->remote_usec = WPA_GET_BE32(&pkt[16]);
1011 }
1012 }
1013 }
1014
1015 free(pkt);
1016}
1017
1018
1019static void receive_transaction(struct sigma_stream *s)
1020{
1021 struct timeval tv;
1022 fd_set rfds;
1023 int res;
1024 char *pkt;
1025 int pktlen;
1026 unsigned int last_rx = 0, counter;
1027 struct sockaddr_in addr;
1028 socklen_t addrlen;
1029
1030 if (s->payload_size)
1031 pktlen = s->payload_size;
1032 else
1033 pktlen = 65536 + 1;
1034 pkt = malloc(pktlen);
1035 if (pkt == NULL)
1036 return;
1037
1038 while (!s->stop) {
1039 FD_ZERO(&rfds);
1040 FD_SET(s->sock, &rfds);
1041 tv.tv_sec = 0;
1042 tv.tv_usec = 300000;
1043 res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
1044 if (res < 0) {
1045 perror("select");
1046 usleep(10000);
1047 } else if (FD_ISSET(s->sock, &rfds)) {
1048 addrlen = sizeof(addr);
1049 res = recvfrom(s->sock, pkt, pktlen, 0,
1050 (struct sockaddr *) &addr, &addrlen);
1051 if (res < 0) {
1052 perror("recv");
1053 break;
1054 }
1055
1056 s->rx_frames++;
1057 s->rx_payload_bytes += res;
1058
1059 counter = WPA_GET_BE32(&pkt[8]);
1060 if (counter < last_rx)
1061 s->out_of_seq_frames++;
1062 last_rx = counter;
1063
1064 /* send response */
1065 res = sendto(s->sock, pkt, pktlen, 0,
1066 (struct sockaddr *) &addr, addrlen);
1067 if (res < 0) {
1068 perror("sendto");
1069 } else {
1070 s->tx_frames++;
1071 s->tx_payload_bytes += res;
1072 }
1073 }
1074 }
1075
1076 free(pkt);
1077}
1078
1079
1080static void * receive_thread(void *ctx)
1081{
1082 struct sigma_stream *s = ctx;
1083
1084 if (s->trans_proto == IPPROTO_TCP) {
1085 /* Wait for socket to be accepted */
1086 struct sockaddr_in connected_addr;
1087 int connected_sock; /* returned from accept on sock */
1088 socklen_t connected_addr_len = sizeof(connected_addr);
1089
1090 sigma_dut_print(s->dut, DUT_MSG_DEBUG,
1091 "Traffic agent: Waiting on accept");
1092 connected_sock = accept(s->sock,
1093 (struct sockaddr *) &connected_addr,
1094 &connected_addr_len);
1095 if (connected_sock < 0) {
1096 sigma_dut_print(s->dut, DUT_MSG_ERROR,
1097 "Traffic agent: Failed to accept: %s",
1098 strerror(errno));
1099 return NULL;
1100 }
1101
1102 sigma_dut_print(s->dut, DUT_MSG_DEBUG,
1103 "Traffic agent: Accepted client closing parent socket and talk over connected sock.");
1104 close(s->sock);
1105 s->sock = connected_sock;
1106 }
1107
1108 switch (s->profile) {
1109 case SIGMA_PROFILE_FILE_TRANSFER:
1110 receive_file(s);
1111 break;
1112 case SIGMA_PROFILE_MULTICAST:
1113 receive_file(s);
1114 break;
1115 case SIGMA_PROFILE_IPTV:
1116 receive_file(s);
1117 break;
1118 case SIGMA_PROFILE_TRANSACTION:
1119 receive_transaction(s);
1120 break;
1121 case SIGMA_PROFILE_START_SYNC:
1122 break;
1123 case SIGMA_PROFILE_UAPSD:
1124 receive_uapsd(s);
1125 break;
1126 }
1127
1128 return NULL;
1129}
1130
1131
1132static int cmd_traffic_agent_receive_start(struct sigma_dut *dut,
1133 struct sigma_conn *conn,
1134 struct sigma_cmd *cmd)
1135{
1136 const char *val;
1137 int streams[MAX_SIGMA_STREAMS];
1138 int i, j, count;
1139 char buf[100];
1140
1141 val = get_param(cmd, "streamID");
1142 if (val == NULL)
1143 return -1;
1144 count = get_stream_id(val, streams);
1145 if (count < 0)
1146 return -1;
1147 for (i = 0; i < count; i++) {
1148 struct sigma_stream *s = get_stream(dut, streams[i]);
1149
1150 if (!s) {
1151 snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
1152 "not configured", streams[i]);
1153 send_resp(dut, conn, SIGMA_INVALID, buf);
1154 return 0;
1155 }
1156 for (j = 0; j < i; j++)
1157 if (streams[i] == streams[j])
1158 return -1;
1159 if (s->sender) {
1160 snprintf(buf, sizeof(buf), "errorCode,Not configured "
1161 "as receiver for streamID %d", streams[i]);
1162 send_resp(dut, conn, SIGMA_INVALID, buf);
1163 return 0;
1164 }
1165 }
1166
1167 for (i = 0; i < count; i++) {
1168 struct sigma_stream *s = get_stream(dut, streams[i]);
1169
1170 if (!s)
1171 continue;
1172 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open "
1173 "receive socket for stream %d", streams[i]);
1174 if (open_socket(dut, s) < 0)
1175 return -2;
1176 }
1177
1178 for (i = 0; i < count; i++) {
1179 struct sigma_stream *s = get_stream(dut, streams[i]);
1180 int res;
1181
1182 if (!s)
1183 continue;
1184 /*
1185 * Provide dut context to the thread to support debugging and
1186 * returning of error messages. Similarly, provide interface
1187 * information to the thread.
1188 */
1189 s->dut = dut;
1190 val = get_param(cmd, "Interface");
1191 if (val) {
1192 strncpy(s->ifname, val, sizeof(s->ifname));
1193 s->ifname[sizeof(s->ifname) - 1] = '\0';
1194 }
1195
1196 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start "
1197 "receive for stream %d", streams[i]);
1198 res = pthread_create(&s->thr, NULL, receive_thread, s);
1199 if (res) {
1200 sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create "
1201 "failed: %d", res);
1202 return -2;
1203 }
1204 s->started = 1;
1205 }
1206
1207 return 1;
1208}
1209
1210
1211static void write_frame_stats(struct sigma_dut *dut, struct sigma_stream *s,
1212 int id)
1213{
1214 char fname[128];
1215 FILE *f;
1216 unsigned int i;
1217
1218 snprintf(fname, sizeof(fname), SIGMA_TMPDIR "/e2e%u-%d.txt",
1219 (unsigned int) time(NULL), id);
1220 f = fopen(fname, "w");
1221 if (f == NULL) {
1222 sigma_dut_print(dut, DUT_MSG_INFO, "Could not write %s",
1223 fname);
1224 return;
1225 }
1226 fprintf(f, "seqnum:local_sec:local_usec:remote_sec:remote_usec\n");
1227
1228 sigma_dut_print(dut, DUT_MSG_DEBUG, "Writing frame stats to %s",
1229 fname);
1230
1231 for (i = 0; i < s->num_stats; i++) {
1232 struct sigma_frame_stats *stats = &s->stats[i];
1233 fprintf(f, "%u:%u:%u:%u:%u\n", stats->seqnum,
1234 stats->local_sec, stats->local_usec,
1235 stats->remote_sec, stats->remote_usec);
1236 }
1237
1238 fclose(f);
1239}
1240
1241
1242static int cmd_traffic_agent_receive_stop(struct sigma_dut *dut,
1243 struct sigma_conn *conn,
1244 struct sigma_cmd *cmd)
1245{
1246 const char *val;
1247 int streams[MAX_SIGMA_STREAMS];
1248 int i, j, ret, count;
1249 char buf[100 + MAX_SIGMA_STREAMS * 60], *pos;
1250
1251 val = get_param(cmd, "streamID");
1252 if (val == NULL)
1253 return -1;
1254 count = get_stream_id(val, streams);
1255 if (count < 0)
1256 return -1;
1257 for (i = 0; i < count; i++) {
1258 struct sigma_stream *s = get_stream(dut, streams[i]);
1259
1260 if (!s) {
1261 snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
1262 "not configured", streams[i]);
1263 send_resp(dut, conn, SIGMA_INVALID, buf);
1264 return 0;
1265 }
1266 for (j = 0; j < i; j++)
1267 if (streams[i] == streams[j])
1268 return -1;
1269 if (!s->started) {
1270 snprintf(buf, sizeof(buf), "errorCode,Receive not "
1271 "started for streamID %d", streams[i]);
1272 send_resp(dut, conn, SIGMA_INVALID, buf);
1273 return 0;
1274 }
1275 }
1276
1277 for (i = 0; i < count; i++) {
1278 struct sigma_stream *s = get_stream(dut, streams[i]);
1279
1280 if (s)
1281 s->stop = 1;
1282 }
1283
1284 for (i = 0; i < count; i++) {
1285 struct sigma_stream *s = get_stream(dut, streams[i]);
1286
1287 if (!s)
1288 continue;
1289 sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: stop "
1290 "receive for stream %d", streams[i]);
1291 stop_stream(s);
1292 }
1293
1294 buf[0] = '\0';
1295 pos = buf;
1296
1297 pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,");
1298 for (i = 0; i < count; i++) {
1299 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1300 i > 0 ? " " : "", streams[i]);
1301 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1302 break;
1303 pos += ret;
1304 }
1305
1306 if (dut->program == PROGRAM_60GHZ) {
1307 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,");
1308 for (i = 0; i < count; i++) {
1309 struct sigma_stream *s = get_stream(dut, streams[i]);
1310
1311 if (!s)
1312 continue;
1313 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1314 i > 0 ? " " : "", s->tx_act_frames);
1315 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1316 break;
1317 pos += ret;
1318 }
1319 }
1320
1321 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,");
1322 for (i = 0; i < count; i++) {
1323 struct sigma_stream *s = get_stream(dut, streams[i]);
1324
1325 if (!s)
1326 continue;
1327 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1328 i > 0 ? " " : "", s->tx_frames);
1329 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1330 break;
1331 pos += ret;
1332 }
1333
1334 pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,");
1335 for (i = 0; i < count; i++) {
1336 struct sigma_stream *s = get_stream(dut, streams[i]);
1337
1338 if (!s)
1339 continue;
1340 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1341 i > 0 ? " " : "", s->rx_frames);
1342 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1343 break;
1344 pos += ret;
1345 }
1346
1347 pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,");
1348 for (i = 0; i < count; i++) {
1349 struct sigma_stream *s = get_stream(dut, streams[i]);
1350
1351 if (!s)
1352 continue;
1353 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
1354 i > 0 ? " " : "", s->tx_payload_bytes);
1355 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1356 break;
1357 pos += ret;
1358 }
1359
1360 pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,");
1361 for (i = 0; i < count; i++) {
1362 struct sigma_stream *s = get_stream(dut, streams[i]);
1363
1364 if (!s)
1365 continue;
1366 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
1367 i > 0 ? " " : "", s->rx_payload_bytes);
1368 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1369 break;
1370 pos += ret;
1371 }
1372
1373 pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,");
1374 for (i = 0; i < count; i++) {
1375 struct sigma_stream *s = get_stream(dut, streams[i]);
1376
1377 if (!s)
1378 continue;
1379 ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
1380 i > 0 ? " " : "", s->out_of_seq_frames);
1381 if (ret < 0 || ret >= buf + sizeof(buf) - pos)
1382 break;
1383 pos += ret;
1384 }
1385
1386 buf[sizeof(buf) - 1] = '\0';
1387
1388 send_resp(dut, conn, SIGMA_COMPLETE, buf);
1389
1390 for (i = 0; i < count; i++) {
1391 struct sigma_stream *s = get_stream(dut, streams[i]);
1392
1393 if (!s)
1394 continue;
1395 if (s->profile == SIGMA_PROFILE_IPTV && s->num_stats > 0 &&
1396 dut->write_stats)
1397 write_frame_stats(dut, s, streams[i]);
1398 free(s->stats);
1399 s->stats = NULL;
1400 s->num_stats = 0;
1401 }
1402
1403 return 0;
1404}
1405
1406
1407static int cmd_traffic_agent_version(struct sigma_dut *dut,
1408 struct sigma_conn *conn,
1409 struct sigma_cmd *cmd)
1410{
1411 send_resp(dut, conn, SIGMA_COMPLETE, "version,1.0");
1412 return 0;
1413}
1414
1415
1416void traffic_agent_register_cmds(void)
1417{
1418 sigma_dut_reg_cmd("traffic_agent_config", NULL,
1419 cmd_traffic_agent_config);
1420 sigma_dut_reg_cmd("traffic_agent_reset", NULL,
1421 cmd_traffic_agent_reset);
1422 sigma_dut_reg_cmd("traffic_agent_send", NULL,
1423 cmd_traffic_agent_send);
1424 sigma_dut_reg_cmd("traffic_agent_receive_start", NULL,
1425 cmd_traffic_agent_receive_start);
1426 sigma_dut_reg_cmd("traffic_agent_receive_stop", NULL,
1427 cmd_traffic_agent_receive_stop);
1428 sigma_dut_reg_cmd("traffic_agent_version", NULL,
1429 cmd_traffic_agent_version);
1430}