blob: 5ec03b2e09ab36b22e7ddb6bad4328e5d0a5c040 [file] [log] [blame]
Andreas Huber7a747b82010-06-07 15:19:40 -07001/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Andreas Huber6e3fa442010-09-21 13:13:15 -070017//#define LOG_NDEBUG 0
18#define LOG_TAG "ARTSPConnection"
19#include <utils/Log.h>
20
Andreas Huber7a747b82010-06-07 15:19:40 -070021#include "ARTSPConnection.h"
22
23#include <media/stagefright/foundation/ABuffer.h>
24#include <media/stagefright/foundation/ADebug.h>
25#include <media/stagefright/foundation/AMessage.h>
Andreas Huber0416da72010-08-26 11:17:32 -070026#include <media/stagefright/MediaErrors.h>
Andreas Huber7a747b82010-06-07 15:19:40 -070027
28#include <arpa/inet.h>
29#include <fcntl.h>
30#include <netdb.h>
31#include <sys/socket.h>
32
33namespace android {
34
35// static
36const int64_t ARTSPConnection::kSelectTimeoutUs = 1000ll;
37
38ARTSPConnection::ARTSPConnection()
39 : mState(DISCONNECTED),
40 mSocket(-1),
41 mConnectionID(0),
42 mNextCSeq(0),
43 mReceiveResponseEventPending(false) {
44}
45
46ARTSPConnection::~ARTSPConnection() {
47 if (mSocket >= 0) {
Andreas Huber6e3fa442010-09-21 13:13:15 -070048 LOGE("Connection is still open, closing the socket.");
Andreas Huber7a747b82010-06-07 15:19:40 -070049 close(mSocket);
50 mSocket = -1;
51 }
52}
53
54void ARTSPConnection::connect(const char *url, const sp<AMessage> &reply) {
55 sp<AMessage> msg = new AMessage(kWhatConnect, id());
56 msg->setString("url", url);
57 msg->setMessage("reply", reply);
58 msg->post();
59}
60
61void ARTSPConnection::disconnect(const sp<AMessage> &reply) {
62 sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
63 msg->setMessage("reply", reply);
64 msg->post();
65}
66
67void ARTSPConnection::sendRequest(
68 const char *request, const sp<AMessage> &reply) {
69 sp<AMessage> msg = new AMessage(kWhatSendRequest, id());
70 msg->setString("request", request);
71 msg->setMessage("reply", reply);
72 msg->post();
73}
74
Andreas Huber0416da72010-08-26 11:17:32 -070075void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) {
76 sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id());
77 msg->setMessage("reply", reply);
78 msg->post();
79}
80
Andreas Huber7a747b82010-06-07 15:19:40 -070081void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
82 switch (msg->what()) {
83 case kWhatConnect:
84 onConnect(msg);
85 break;
86
87 case kWhatDisconnect:
88 onDisconnect(msg);
89 break;
90
91 case kWhatCompleteConnection:
92 onCompleteConnection(msg);
93 break;
94
95 case kWhatSendRequest:
96 onSendRequest(msg);
97 break;
98
99 case kWhatReceiveResponse:
100 onReceiveResponse();
101 break;
102
Andreas Huber0416da72010-08-26 11:17:32 -0700103 case kWhatObserveBinaryData:
104 {
105 CHECK(msg->findMessage("reply", &mObserveBinaryMessage));
106 break;
107 }
108
Andreas Huber7a747b82010-06-07 15:19:40 -0700109 default:
110 TRESPASS();
111 break;
112 }
113}
114
115// static
116bool ARTSPConnection::ParseURL(
117 const char *url, AString *host, unsigned *port, AString *path) {
118 host->clear();
119 *port = 0;
120 path->clear();
121
122 if (strncasecmp("rtsp://", url, 7)) {
123 return false;
124 }
125
126 const char *slashPos = strchr(&url[7], '/');
127
128 if (slashPos == NULL) {
129 host->setTo(&url[7]);
130 path->setTo("/");
131 } else {
132 host->setTo(&url[7], slashPos - &url[7]);
133 path->setTo(slashPos);
134 }
135
136 char *colonPos = strchr(host->c_str(), ':');
137
138 if (colonPos != NULL) {
139 unsigned long x;
140 if (!ParseSingleUnsignedLong(colonPos + 1, &x) || x >= 65536) {
141 return false;
142 }
143
144 *port = x;
145
146 size_t colonOffset = colonPos - host->c_str();
147 size_t trailing = host->size() - colonOffset;
148 host->erase(colonOffset, trailing);
149 } else {
150 *port = 554;
151 }
152
153 return true;
154}
155
Andreas Huberaf063a62010-08-18 10:17:18 -0700156static void MakeSocketBlocking(int s, bool blocking) {
157 // Make socket non-blocking.
158 int flags = fcntl(s, F_GETFL, 0);
159 CHECK_NE(flags, -1);
160
161 if (blocking) {
162 flags &= ~O_NONBLOCK;
163 } else {
164 flags |= O_NONBLOCK;
165 }
166
167 CHECK_NE(fcntl(s, F_SETFL, flags), -1);
168}
169
Andreas Huber7a747b82010-06-07 15:19:40 -0700170void ARTSPConnection::onConnect(const sp<AMessage> &msg) {
171 ++mConnectionID;
172
173 if (mState != DISCONNECTED) {
174 close(mSocket);
175 mSocket = -1;
176
177 flushPendingRequests();
178 }
179
180 mState = CONNECTING;
181
Andreas Huber7a747b82010-06-07 15:19:40 -0700182 AString url;
183 CHECK(msg->findString("url", &url));
184
Andreas Huberf3d2bdf2010-09-15 11:18:13 -0700185 sp<AMessage> reply;
186 CHECK(msg->findMessage("reply", &reply));
187
Andreas Huber7a747b82010-06-07 15:19:40 -0700188 AString host, path;
189 unsigned port;
Andreas Huberf3d2bdf2010-09-15 11:18:13 -0700190 if (!ParseURL(url.c_str(), &host, &port, &path)) {
Andreas Huber6e3fa442010-09-21 13:13:15 -0700191 LOGE("Malformed rtsp url %s", url.c_str());
Andreas Huberf3d2bdf2010-09-15 11:18:13 -0700192
193 reply->setInt32("result", ERROR_MALFORMED);
194 reply->post();
195
196 mState = DISCONNECTED;
197 return;
198 }
Andreas Huber7a747b82010-06-07 15:19:40 -0700199
200 struct hostent *ent = gethostbyname(host.c_str());
Andreas Huberf3d2bdf2010-09-15 11:18:13 -0700201 if (ent == NULL) {
Andreas Huber6e3fa442010-09-21 13:13:15 -0700202 LOGE("Unknown host %s", host.c_str());
Andreas Huberf3d2bdf2010-09-15 11:18:13 -0700203
204 reply->setInt32("result", -ENOENT);
205 reply->post();
206
207 mState = DISCONNECTED;
208 return;
209 }
210
211 mSocket = socket(AF_INET, SOCK_STREAM, 0);
212
213 MakeSocketBlocking(mSocket, false);
Andreas Huber7a747b82010-06-07 15:19:40 -0700214
215 struct sockaddr_in remote;
216 memset(remote.sin_zero, 0, sizeof(remote.sin_zero));
217 remote.sin_family = AF_INET;
218 remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
219 remote.sin_port = htons(port);
220
221 int err = ::connect(
222 mSocket, (const struct sockaddr *)&remote, sizeof(remote));
223
Andreas Huber7a747b82010-06-07 15:19:40 -0700224 reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr));
225
226 if (err < 0) {
227 if (errno == EINPROGRESS) {
228 sp<AMessage> msg = new AMessage(kWhatCompleteConnection, id());
229 msg->setMessage("reply", reply);
230 msg->setInt32("connection-id", mConnectionID);
231 msg->post();
232 return;
233 }
234
235 reply->setInt32("result", -errno);
236 mState = DISCONNECTED;
237
238 close(mSocket);
239 mSocket = -1;
240 } else {
241 reply->setInt32("result", OK);
242 mState = CONNECTED;
243 mNextCSeq = 1;
244
245 postReceiveReponseEvent();
246 }
247
248 reply->post();
249}
250
251void ARTSPConnection::onDisconnect(const sp<AMessage> &msg) {
252 if (mState == CONNECTED || mState == CONNECTING) {
253 close(mSocket);
254 mSocket = -1;
255
256 flushPendingRequests();
Andreas Huberaf063a62010-08-18 10:17:18 -0700257 }
Andreas Huber7a747b82010-06-07 15:19:40 -0700258
259 sp<AMessage> reply;
260 CHECK(msg->findMessage("reply", &reply));
261
262 reply->setInt32("result", OK);
263 mState = DISCONNECTED;
264
265 reply->post();
266}
267
268void ARTSPConnection::onCompleteConnection(const sp<AMessage> &msg) {
269 sp<AMessage> reply;
270 CHECK(msg->findMessage("reply", &reply));
271
272 int32_t connectionID;
273 CHECK(msg->findInt32("connection-id", &connectionID));
274
275 if ((connectionID != mConnectionID) || mState != CONNECTING) {
276 // While we were attempting to connect, the attempt was
277 // cancelled.
278 reply->setInt32("result", -ECONNABORTED);
279 reply->post();
280 return;
281 }
282
283 struct timeval tv;
284 tv.tv_sec = 0;
285 tv.tv_usec = kSelectTimeoutUs;
286
287 fd_set ws;
288 FD_ZERO(&ws);
289 FD_SET(mSocket, &ws);
290
291 int res = select(mSocket + 1, NULL, &ws, NULL, &tv);
292 CHECK_GE(res, 0);
293
294 if (res == 0) {
295 // Timed out. Not yet connected.
296
297 msg->post();
298 return;
299 }
300
301 int err;
302 socklen_t optionLen = sizeof(err);
303 CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
304 CHECK_EQ(optionLen, (socklen_t)sizeof(err));
305
306 if (err != 0) {
Andreas Huber6e3fa442010-09-21 13:13:15 -0700307 LOGE("err = %d (%s)", err, strerror(err));
Andreas Huber7a747b82010-06-07 15:19:40 -0700308
309 reply->setInt32("result", -err);
310
311 mState = DISCONNECTED;
312 close(mSocket);
313 mSocket = -1;
314 } else {
315 reply->setInt32("result", OK);
316 mState = CONNECTED;
317 mNextCSeq = 1;
318
319 postReceiveReponseEvent();
320 }
321
322 reply->post();
323}
324
325void ARTSPConnection::onSendRequest(const sp<AMessage> &msg) {
326 sp<AMessage> reply;
327 CHECK(msg->findMessage("reply", &reply));
328
329 if (mState != CONNECTED) {
330 reply->setInt32("result", -ENOTCONN);
331 reply->post();
332 return;
333 }
334
335 AString request;
336 CHECK(msg->findString("request", &request));
337
338 // Find the boundary between headers and the body.
339 ssize_t i = request.find("\r\n\r\n");
340 CHECK_GE(i, 0);
341
342 int32_t cseq = mNextCSeq++;
343
344 AString cseqHeader = "CSeq: ";
345 cseqHeader.append(cseq);
346 cseqHeader.append("\r\n");
347
348 request.insert(cseqHeader, i + 2);
349
Andreas Huber6e3fa442010-09-21 13:13:15 -0700350 LOGV("%s", request.c_str());
Andreas Huber7a747b82010-06-07 15:19:40 -0700351
352 size_t numBytesSent = 0;
353 while (numBytesSent < request.size()) {
354 ssize_t n =
355 send(mSocket, request.c_str() + numBytesSent,
356 request.size() - numBytesSent, 0);
357
358 if (n == 0) {
359 // Server closed the connection.
Andreas Huber6e3fa442010-09-21 13:13:15 -0700360 LOGE("Server unexpectedly closed the connection.");
Andreas Huberf3d2bdf2010-09-15 11:18:13 -0700361
362 reply->setInt32("result", ERROR_IO);
363 reply->post();
364 return;
Andreas Huber7a747b82010-06-07 15:19:40 -0700365 } else if (n < 0) {
366 if (errno == EINTR) {
367 continue;
368 }
369
Andreas Huber6e3fa442010-09-21 13:13:15 -0700370 LOGE("Error sending rtsp request.");
Andreas Huberf3d2bdf2010-09-15 11:18:13 -0700371 reply->setInt32("result", -errno);
372 reply->post();
373 return;
Andreas Huber7a747b82010-06-07 15:19:40 -0700374 }
375
376 numBytesSent += (size_t)n;
377 }
378
379 mPendingRequests.add(cseq, reply);
380}
381
382void ARTSPConnection::onReceiveResponse() {
383 mReceiveResponseEventPending = false;
384
385 if (mState != CONNECTED) {
386 return;
387 }
388
389 struct timeval tv;
390 tv.tv_sec = 0;
391 tv.tv_usec = kSelectTimeoutUs;
392
393 fd_set rs;
394 FD_ZERO(&rs);
395 FD_SET(mSocket, &rs);
396
397 int res = select(mSocket + 1, &rs, NULL, NULL, &tv);
398 CHECK_GE(res, 0);
399
400 if (res == 1) {
Andreas Huberaf063a62010-08-18 10:17:18 -0700401 MakeSocketBlocking(mSocket, true);
402
403 bool success = receiveRTSPReponse();
404
405 MakeSocketBlocking(mSocket, false);
406
407 if (!success) {
Andreas Huber7a747b82010-06-07 15:19:40 -0700408 // Something horrible, irreparable has happened.
409 flushPendingRequests();
410 return;
411 }
412 }
413
414 postReceiveReponseEvent();
415}
416
417void ARTSPConnection::flushPendingRequests() {
418 for (size_t i = 0; i < mPendingRequests.size(); ++i) {
419 sp<AMessage> reply = mPendingRequests.valueAt(i);
420
421 reply->setInt32("result", -ECONNABORTED);
422 reply->post();
423 }
424
425 mPendingRequests.clear();
426}
427
428void ARTSPConnection::postReceiveReponseEvent() {
429 if (mReceiveResponseEventPending) {
430 return;
431 }
432
433 sp<AMessage> msg = new AMessage(kWhatReceiveResponse, id());
434 msg->post();
435
436 mReceiveResponseEventPending = true;
437}
438
Andreas Huber0416da72010-08-26 11:17:32 -0700439status_t ARTSPConnection::receive(void *data, size_t size) {
440 size_t offset = 0;
441 while (offset < size) {
442 ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0);
Andreas Huber7a747b82010-06-07 15:19:40 -0700443 if (n == 0) {
444 // Server closed the connection.
Andreas Huber6e3fa442010-09-21 13:13:15 -0700445 LOGE("Server unexpectedly closed the connection.");
Andreas Huber0416da72010-08-26 11:17:32 -0700446 return ERROR_IO;
Andreas Huber7a747b82010-06-07 15:19:40 -0700447 } else if (n < 0) {
448 if (errno == EINTR) {
449 continue;
450 }
451
Andreas Huber6e3fa442010-09-21 13:13:15 -0700452 LOGE("Error reading rtsp response.");
Andreas Huberf3d2bdf2010-09-15 11:18:13 -0700453 return -errno;
Andreas Huber7a747b82010-06-07 15:19:40 -0700454 }
455
Andreas Huber0416da72010-08-26 11:17:32 -0700456 offset += (size_t)n;
457 }
458
459 return OK;
460}
461
462bool ARTSPConnection::receiveLine(AString *line) {
463 line->clear();
464
465 bool sawCR = false;
466 for (;;) {
467 char c;
468 if (receive(&c, 1) != OK) {
469 return false;
470 }
471
Andreas Huber7a747b82010-06-07 15:19:40 -0700472 if (sawCR && c == '\n') {
473 line->erase(line->size() - 1, 1);
474 return true;
475 }
476
477 line->append(&c, 1);
478
Andreas Huber0416da72010-08-26 11:17:32 -0700479 if (c == '$' && line->size() == 1) {
480 // Special-case for interleaved binary data.
481 return true;
482 }
483
Andreas Huber7a747b82010-06-07 15:19:40 -0700484 sawCR = (c == '\r');
485 }
486}
487
Andreas Huber0416da72010-08-26 11:17:32 -0700488sp<ABuffer> ARTSPConnection::receiveBinaryData() {
489 uint8_t x[3];
490 if (receive(x, 3) != OK) {
491 return NULL;
492 }
Andreas Huber7a747b82010-06-07 15:19:40 -0700493
Andreas Huber0416da72010-08-26 11:17:32 -0700494 sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]);
495 if (receive(buffer->data(), buffer->size()) != OK) {
496 return NULL;
497 }
498
499 buffer->meta()->setInt32("index", (int32_t)x[0]);
500
501 return buffer;
502}
503
504bool ARTSPConnection::receiveRTSPReponse() {
505 AString statusLine;
506
507 if (!receiveLine(&statusLine)) {
Andreas Huber7a747b82010-06-07 15:19:40 -0700508 return false;
509 }
510
Andreas Huber0416da72010-08-26 11:17:32 -0700511 if (statusLine == "$") {
512 sp<ABuffer> buffer = receiveBinaryData();
513
514 if (buffer == NULL) {
515 return false;
516 }
517
518 if (mObserveBinaryMessage != NULL) {
519 sp<AMessage> notify = mObserveBinaryMessage->dup();
520 notify->setObject("buffer", buffer);
521 notify->post();
522 } else {
Andreas Huber6e3fa442010-09-21 13:13:15 -0700523 LOGW("received binary data, but no one cares.");
Andreas Huber0416da72010-08-26 11:17:32 -0700524 }
525
526 return true;
527 }
528
529 sp<ARTSPResponse> response = new ARTSPResponse;
530 response->mStatusLine = statusLine;
531
Andreas Huber6e3fa442010-09-21 13:13:15 -0700532 LOGI("status: %s", response->mStatusLine.c_str());
Andreas Huber7a747b82010-06-07 15:19:40 -0700533
534 ssize_t space1 = response->mStatusLine.find(" ");
535 if (space1 < 0) {
536 return false;
537 }
538 ssize_t space2 = response->mStatusLine.find(" ", space1 + 1);
539 if (space2 < 0) {
540 return false;
541 }
542
543 AString statusCodeStr(
544 response->mStatusLine, space1 + 1, space2 - space1 - 1);
545
546 if (!ParseSingleUnsignedLong(
547 statusCodeStr.c_str(), &response->mStatusCode)
548 || response->mStatusCode < 100 || response->mStatusCode > 999) {
549 return false;
550 }
551
552 AString line;
553 for (;;) {
554 if (!receiveLine(&line)) {
555 break;
556 }
557
558 if (line.empty()) {
559 break;
560 }
561
Andreas Huber6e3fa442010-09-21 13:13:15 -0700562 LOGV("line: %s", line.c_str());
Andreas Huber7a747b82010-06-07 15:19:40 -0700563
564 ssize_t colonPos = line.find(":");
565 if (colonPos < 0) {
566 // Malformed header line.
567 return false;
568 }
569
570 AString key(line, 0, colonPos);
571 key.trim();
572 key.tolower();
573
574 line.erase(0, colonPos + 1);
575 line.trim();
576
577 response->mHeaders.add(key, line);
578 }
579
580 unsigned long contentLength = 0;
581
582 ssize_t i = response->mHeaders.indexOfKey("content-length");
583
584 if (i >= 0) {
585 AString value = response->mHeaders.valueAt(i);
586 if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) {
587 return false;
588 }
589 }
590
591 if (contentLength > 0) {
592 response->mContent = new ABuffer(contentLength);
593
594 size_t numBytesRead = 0;
595 while (numBytesRead < contentLength) {
596 ssize_t n = recv(
597 mSocket, response->mContent->data() + numBytesRead,
598 contentLength - numBytesRead, 0);
599
600 if (n == 0) {
601 // Server closed the connection.
602 TRESPASS();
603 } else if (n < 0) {
604 if (errno == EINTR) {
605 continue;
606 }
607
608 TRESPASS();
609 }
610
611 numBytesRead += (size_t)n;
612 }
613 }
614
615 return notifyResponseListener(response);
616}
617
618// static
619bool ARTSPConnection::ParseSingleUnsignedLong(
620 const char *from, unsigned long *x) {
621 char *end;
622 *x = strtoul(from, &end, 10);
623
624 if (end == from || *end != '\0') {
625 return false;
626 }
627
628 return true;
629}
630
631bool ARTSPConnection::notifyResponseListener(
632 const sp<ARTSPResponse> &response) {
633 ssize_t i = response->mHeaders.indexOfKey("cseq");
634
635 if (i < 0) {
636 return true;
637 }
638
639 AString value = response->mHeaders.valueAt(i);
640
641 unsigned long cseq;
642 if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) {
643 return false;
644 }
645
646 i = mPendingRequests.indexOfKey(cseq);
647
648 if (i < 0) {
649 // Unsolicited response?
650 TRESPASS();
651 }
652
653 sp<AMessage> reply = mPendingRequests.valueAt(i);
654 mPendingRequests.removeItemsAt(i);
655
656 reply->setInt32("result", OK);
657 reply->setObject("response", response);
658 reply->post();
659
660 return true;
661}
662
663} // namespace android