blob: 68065c65c57d466a7efe74dcad4c46b3b4d67700 [file] [log] [blame]
Jarkko Poyry3c827362014-09-02 11:48:52 +03001/*-------------------------------------------------------------------------
2 * drawElements Quality Program Test Executor
3 * ------------------------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Tcp/Ip communication link.
22 *//*--------------------------------------------------------------------*/
23
24#include "xeTcpIpLink.hpp"
25#include "xsProtocol.hpp"
26#include "deClock.h"
27#include "deInt32.h"
28
29namespace xe
30{
31
32enum
33{
34 SEND_BUFFER_BLOCK_SIZE = 1024,
35 SEND_BUFFER_NUM_BLOCKS = 64
36};
37
38// Utilities for writing messages out.
39
40static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
41{
42 deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
43 xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
44 dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
45}
46
47static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
48{
49 writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
50 dst.flush();
51}
52
53static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
54{
55 int nameSize = (int)strlen(name) + 1;
56 int paramsSize = (int)strlen(params) + 1;
57 int workDirSize = (int)strlen(workDir) + 1;
58 int caseListSize = (int)strlen(caseList) + 1;
59 int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
60
61 writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
62 dst.write(nameSize, (const deUint8*)name);
63 dst.write(paramsSize, (const deUint8*)params);
64 dst.write(workDirSize, (const deUint8*)workDir);
65 dst.write(caseListSize, (const deUint8*)caseList);
66 dst.flush();
67}
68
69static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
70{
71 writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
72 dst.flush();
73}
74
75// TcpIpLinkState
76
77TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
78 : m_state (initialState)
79 , m_error (initialErr)
80 , m_lastKeepaliveReceived (0)
81 , m_stateChangedCallback (DE_NULL)
82 , m_testLogDataCallback (DE_NULL)
83 , m_infoLogDataCallback (DE_NULL)
84 , m_userPtr (DE_NULL)
85{
86}
87
88TcpIpLinkState::~TcpIpLinkState (void)
89{
90}
91
92CommLinkState TcpIpLinkState::getState (void) const
93{
94 de::ScopedLock lock(m_lock);
95
96 return m_state;
97}
98
99CommLinkState TcpIpLinkState::getState (std::string& error) const
100{
101 de::ScopedLock lock(m_lock);
102
103 error = m_error;
104 return m_state;
105}
106
107void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
108{
109 de::ScopedLock lock(m_lock);
110
111 m_stateChangedCallback = stateChangedCallback;
112 m_testLogDataCallback = testLogDataCallback;
113 m_infoLogDataCallback = infoLogDataCallback;
114 m_userPtr = userPtr;
115}
116
117void TcpIpLinkState::setState (CommLinkState state, const char* error)
118{
119 CommLink::StateChangedFunc callback = DE_NULL;
120 void* userPtr = DE_NULL;
121
122 {
123 de::ScopedLock lock(m_lock);
124
125 m_state = state;
126 m_error = error;
127
128 callback = m_stateChangedCallback;
129 userPtr = m_userPtr;
130 }
131
132 if (callback)
133 callback(userPtr, state, error);
134}
135
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700136void TcpIpLinkState::onTestLogData (const deUint8* bytes, size_t numBytes) const
Jarkko Poyry3c827362014-09-02 11:48:52 +0300137{
138 CommLink::LogDataFunc callback = DE_NULL;
139 void* userPtr = DE_NULL;
140
141 m_lock.lock();
142 callback = m_testLogDataCallback;
143 userPtr = m_userPtr;
144 m_lock.unlock();
145
146 if (callback)
147 callback(userPtr, bytes, numBytes);
148}
149
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700150void TcpIpLinkState::onInfoLogData (const deUint8* bytes, size_t numBytes) const
Jarkko Poyry3c827362014-09-02 11:48:52 +0300151{
152 CommLink::LogDataFunc callback = DE_NULL;
153 void* userPtr = DE_NULL;
154
155 m_lock.lock();
156 callback = m_infoLogDataCallback;
157 userPtr = m_userPtr;
158 m_lock.unlock();
159
160 if (callback)
161 callback(userPtr, bytes, numBytes);
162}
163
164void TcpIpLinkState::onKeepaliveReceived (void)
165{
166 de::ScopedLock lock(m_lock);
167 m_lastKeepaliveReceived = deGetMicroseconds();
168}
169
170deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
171{
172 de::ScopedLock lock(m_lock);
173 return m_lastKeepaliveReceived;
174}
175
176// TcpIpSendThread
177
178TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
179 : m_socket (socket)
180 , m_state (state)
181 , m_buffer (SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
182 , m_isRunning (false)
183{
184}
185
186TcpIpSendThread::~TcpIpSendThread (void)
187{
188}
189
190void TcpIpSendThread::start (void)
191{
192 DE_ASSERT(!m_isRunning);
193
194 // Reset state.
195 m_buffer.clear();
196 m_isRunning = true;
197
198 de::Thread::start();
199}
200
201void TcpIpSendThread::run (void)
202{
203 try
204 {
205 deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
206
207 while (!m_buffer.isCanceled())
208 {
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700209 size_t numToSend = 0;
210 size_t numSent = 0;
Jarkko Poyry3c827362014-09-02 11:48:52 +0300211 deSocketResult result = DE_SOCKETRESULT_LAST;
212
213 try
214 {
215 // Wait for single byte and then try to read more.
216 m_buffer.read(1, &buf[0]);
217 numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
218 }
219 catch (const de::BlockBuffer<deUint8>::CanceledException&)
220 {
221 // Handled in loop condition.
222 }
223
224 while (numSent < numToSend)
225 {
226 result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
227
228 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
229 XE_FAIL("Connection closed");
230 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
231 XE_FAIL("Connection terminated");
232 else if (result == DE_SOCKETRESULT_ERROR)
233 XE_FAIL("Socket error");
234 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
235 {
236 // \note Socket should not be in non-blocking mode.
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700237 DE_ASSERT(numSent == 0);
Jarkko Poyry3c827362014-09-02 11:48:52 +0300238 deYield();
239 }
240 else
241 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
242 }
243 }
244 }
245 catch (const std::exception& e)
246 {
247 m_state.setState(COMMLINKSTATE_ERROR, e.what());
248 }
249}
250
251void TcpIpSendThread::stop (void)
252{
253 if (m_isRunning)
254 {
255 m_buffer.cancel();
256 join();
257 m_isRunning = false;
258 }
259}
260
261// TcpIpRecvThread
262
263TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
264 : m_socket (socket)
265 , m_state (state)
266 , m_curMsgPos (0)
267 , m_isRunning (false)
268{
269}
270
271TcpIpRecvThread::~TcpIpRecvThread (void)
272{
273}
274
275void TcpIpRecvThread::start (void)
276{
277 DE_ASSERT(!m_isRunning);
278
279 // Reset state.
280 m_curMsgPos = 0;
281 m_isRunning = true;
282
283 de::Thread::start();
284}
285
286void TcpIpRecvThread::run (void)
287{
288 try
289 {
290 for (;;)
291 {
292 bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
293 bool hasPayload = false;
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700294 size_t messageSize = 0;
Jarkko Poyry3c827362014-09-02 11:48:52 +0300295 xs::MessageType messageType = (xs::MessageType)0;
296
297 if (hasHeader)
298 {
299 xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
300 hasPayload = m_curMsgPos >= messageSize;
301 }
302
303 if (hasPayload)
304 {
305 // Process message.
306 handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
307 m_curMsgPos = 0;
308 }
309 else
310 {
311 // Try to receive missing bytes.
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700312 size_t curSize = hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
313 size_t bytesToRecv = curSize-m_curMsgPos;
314 size_t numRecv = 0;
Jarkko Poyry3c827362014-09-02 11:48:52 +0300315 deSocketResult result = DE_SOCKETRESULT_LAST;
316
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700317 if (m_curMsgBuf.size() < curSize)
Jarkko Poyry3c827362014-09-02 11:48:52 +0300318 m_curMsgBuf.resize(curSize);
319
320 result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
321
322 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
323 XE_FAIL("Connection closed");
324 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
325 XE_FAIL("Connection terminated");
326 else if (result == DE_SOCKETRESULT_ERROR)
327 XE_FAIL("Socket error");
328 else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
329 {
330 // \note Socket should not be in non-blocking mode.
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700331 DE_ASSERT(numRecv == 0);
Jarkko Poyry3c827362014-09-02 11:48:52 +0300332 deYield();
333 }
334 else
335 {
336 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
337 DE_ASSERT(numRecv <= bytesToRecv);
338 m_curMsgPos += numRecv;
339 // Continue receiving bytes / handle message in next iter.
340 }
341 }
342 }
343 }
344 catch (const std::exception& e)
345 {
346 m_state.setState(COMMLINKSTATE_ERROR, e.what());
347 }
348}
349
350void TcpIpRecvThread::stop (void)
351{
352 if (m_isRunning)
353 {
354 // \note Socket must be closed before terminating receive thread.
355 XE_CHECK(!m_socket.isReceiveOpen());
356
357 join();
358 m_isRunning = false;
359 }
360}
361
Jarkko Pöyry745d7c62015-05-19 12:24:51 -0700362void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, size_t dataSize)
Jarkko Poyry3c827362014-09-02 11:48:52 +0300363{
364 switch (messageType)
365 {
366 case xs::MESSAGETYPE_KEEPALIVE:
367 m_state.onKeepaliveReceived();
368 break;
369
370 case xs::MESSAGETYPE_PROCESS_STARTED:
371 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
372 m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
373 break;
374
375 case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
376 {
377 xs::ProcessLaunchFailedMessage msg(data, dataSize);
378 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
379 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
380 break;
381 }
382
383 case xs::MESSAGETYPE_PROCESS_FINISHED:
384 {
385 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
386 xs::ProcessFinishedMessage msg(data, dataSize);
387 m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
388 DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
389 break;
390 }
391
392 case xs::MESSAGETYPE_PROCESS_LOG_DATA:
393 case xs::MESSAGETYPE_INFO:
394 // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
395 if (data[dataSize-1] == 0)
396 dataSize -= 1;
397
398 if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
399 {
400 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
401 m_state.onTestLogData(&data[0], dataSize);
402 }
403 else
404 m_state.onInfoLogData(&data[0], dataSize);
405 break;
406
407 default:
408 XE_FAIL("Unknown message");
409 }
410}
411
412// TcpIpLink
413
414TcpIpLink::TcpIpLink (void)
415 : m_state (COMMLINKSTATE_ERROR, "Not connected")
416 , m_sendThread (m_socket, m_state)
417 , m_recvThread (m_socket, m_state)
418 , m_keepaliveTimer (DE_NULL)
419{
420 m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
421 XE_CHECK(m_keepaliveTimer);
422}
423
424TcpIpLink::~TcpIpLink (void)
425{
426 try
427 {
428 closeConnection();
429 }
430 catch (...)
431 {
432 // Can't do much except to ignore error.
433 }
434 deTimer_destroy(m_keepaliveTimer);
435}
436
437void TcpIpLink::closeConnection (void)
438{
439 {
440 deSocketState state = m_socket.getState();
441 if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
442 m_socket.shutdown();
443 }
444
445 if (deTimer_isActive(m_keepaliveTimer))
446 deTimer_disable(m_keepaliveTimer);
447
448 if (m_sendThread.isRunning())
449 m_sendThread.stop();
450
451 if (m_recvThread.isRunning())
452 m_recvThread.stop();
453
454 if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
455 m_socket.close();
456}
457
458void TcpIpLink::connect (const de::SocketAddress& address)
459{
460 XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
461 XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
462 XE_CHECK(!m_sendThread.isRunning());
463 XE_CHECK(!m_recvThread.isRunning());
464
465 m_socket.connect(address);
466
467 try
468 {
469 // Clear error and set state to ready.
470 m_state.setState(COMMLINKSTATE_READY, "");
471 m_state.onKeepaliveReceived();
472
473 // Launch threads.
474 m_sendThread.start();
475 m_recvThread.start();
476
477 XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
478 }
479 catch (const std::exception& e)
480 {
481 closeConnection();
482 m_state.setState(COMMLINKSTATE_ERROR, e.what());
Mika Isojärvie16b0f52015-05-18 15:32:39 -0700483 throw;
Jarkko Poyry3c827362014-09-02 11:48:52 +0300484 }
485}
486
487void TcpIpLink::disconnect (void)
488{
489 try
490 {
491 closeConnection();
492 m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
493 }
494 catch (const std::exception& e)
495 {
496 m_state.setState(COMMLINKSTATE_ERROR, e.what());
497 }
498}
499
500void TcpIpLink::reset (void)
501{
502 // \note Just clears error state if we are connected.
503 if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
504 {
505 m_state.setState(COMMLINKSTATE_READY, "");
506
507 // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
508 }
509 else
510 disconnect(); // Abnormal state/usage. Disconnect socket.
511}
512
513void TcpIpLink::keepaliveTimerCallback (void* ptr)
514{
515 TcpIpLink* link = static_cast<TcpIpLink*>(ptr);
516 deUint64 lastKeepalive = link->m_state.getLastKeepaliveRecevied();
517 deUint64 curTime = deGetMicroseconds();
518
519 // Check for timeout.
520 if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
521 link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
522
523 // Enqueue new keepalive.
524 try
525 {
526 writeKeepalive(link->m_sendThread.getBuffer());
527 }
528 catch (const de::BlockBuffer<deUint8>::CanceledException&)
529 {
530 // Ignore. Can happen in connection teardown.
531 }
532}
533
534CommLinkState TcpIpLink::getState (void) const
535{
536 return m_state.getState();
537}
538
539CommLinkState TcpIpLink::getState (std::string& message) const
540{
541 return m_state.getState(message);
542}
543
544void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
545{
546 m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
547}
548
549void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
550{
551 XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
552
553 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
554 writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
555}
556
557void TcpIpLink::stopTestProcess (void)
558{
559 XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
560 writeStopExecution(m_sendThread.getBuffer());
561}
562
563} // xe