Import dEQP.
Import drawElements Quality Program from an internal repository.
Bug: 17388917
Change-Id: Ic109fe4a57e31b2a816113d90fbdf51a43e7abeb
diff --git a/executor/xeTcpIpLink.cpp b/executor/xeTcpIpLink.cpp
new file mode 100644
index 0000000..1778a4f
--- /dev/null
+++ b/executor/xeTcpIpLink.cpp
@@ -0,0 +1,562 @@
+/*-------------------------------------------------------------------------
+ * drawElements Quality Program Test Executor
+ * ------------------------------------------
+ *
+ * Copyright 2014 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *//*!
+ * \file
+ * \brief Tcp/Ip communication link.
+ *//*--------------------------------------------------------------------*/
+
+#include "xeTcpIpLink.hpp"
+#include "xsProtocol.hpp"
+#include "deClock.h"
+#include "deInt32.h"
+
+namespace xe
+{
+
+enum
+{
+ SEND_BUFFER_BLOCK_SIZE = 1024,
+ SEND_BUFFER_NUM_BLOCKS = 64
+};
+
+// Utilities for writing messages out.
+
+static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
+{
+ deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
+ xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
+ dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
+}
+
+static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
+{
+ writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
+ dst.flush();
+}
+
+static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
+{
+ int nameSize = (int)strlen(name) + 1;
+ int paramsSize = (int)strlen(params) + 1;
+ int workDirSize = (int)strlen(workDir) + 1;
+ int caseListSize = (int)strlen(caseList) + 1;
+ int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
+
+ writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
+ dst.write(nameSize, (const deUint8*)name);
+ dst.write(paramsSize, (const deUint8*)params);
+ dst.write(workDirSize, (const deUint8*)workDir);
+ dst.write(caseListSize, (const deUint8*)caseList);
+ dst.flush();
+}
+
+static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
+{
+ writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
+ dst.flush();
+}
+
+// TcpIpLinkState
+
+TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
+ : m_state (initialState)
+ , m_error (initialErr)
+ , m_lastKeepaliveReceived (0)
+ , m_stateChangedCallback (DE_NULL)
+ , m_testLogDataCallback (DE_NULL)
+ , m_infoLogDataCallback (DE_NULL)
+ , m_userPtr (DE_NULL)
+{
+}
+
+TcpIpLinkState::~TcpIpLinkState (void)
+{
+}
+
+CommLinkState TcpIpLinkState::getState (void) const
+{
+ de::ScopedLock lock(m_lock);
+
+ return m_state;
+}
+
+CommLinkState TcpIpLinkState::getState (std::string& error) const
+{
+ de::ScopedLock lock(m_lock);
+
+ error = m_error;
+ return m_state;
+}
+
+void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
+{
+ de::ScopedLock lock(m_lock);
+
+ m_stateChangedCallback = stateChangedCallback;
+ m_testLogDataCallback = testLogDataCallback;
+ m_infoLogDataCallback = infoLogDataCallback;
+ m_userPtr = userPtr;
+}
+
+void TcpIpLinkState::setState (CommLinkState state, const char* error)
+{
+ CommLink::StateChangedFunc callback = DE_NULL;
+ void* userPtr = DE_NULL;
+
+ {
+ de::ScopedLock lock(m_lock);
+
+ m_state = state;
+ m_error = error;
+
+ callback = m_stateChangedCallback;
+ userPtr = m_userPtr;
+ }
+
+ if (callback)
+ callback(userPtr, state, error);
+}
+
+void TcpIpLinkState::onTestLogData (const deUint8* bytes, int numBytes) const
+{
+ CommLink::LogDataFunc callback = DE_NULL;
+ void* userPtr = DE_NULL;
+
+ m_lock.lock();
+ callback = m_testLogDataCallback;
+ userPtr = m_userPtr;
+ m_lock.unlock();
+
+ if (callback)
+ callback(userPtr, bytes, numBytes);
+}
+
+void TcpIpLinkState::onInfoLogData (const deUint8* bytes, int numBytes) const
+{
+ CommLink::LogDataFunc callback = DE_NULL;
+ void* userPtr = DE_NULL;
+
+ m_lock.lock();
+ callback = m_infoLogDataCallback;
+ userPtr = m_userPtr;
+ m_lock.unlock();
+
+ if (callback)
+ callback(userPtr, bytes, numBytes);
+}
+
+void TcpIpLinkState::onKeepaliveReceived (void)
+{
+ de::ScopedLock lock(m_lock);
+ m_lastKeepaliveReceived = deGetMicroseconds();
+}
+
+deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
+{
+ de::ScopedLock lock(m_lock);
+ return m_lastKeepaliveReceived;
+}
+
+// TcpIpSendThread
+
+TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
+ : m_socket (socket)
+ , m_state (state)
+ , m_buffer (SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
+ , m_isRunning (false)
+{
+}
+
+TcpIpSendThread::~TcpIpSendThread (void)
+{
+}
+
+void TcpIpSendThread::start (void)
+{
+ DE_ASSERT(!m_isRunning);
+
+ // Reset state.
+ m_buffer.clear();
+ m_isRunning = true;
+
+ de::Thread::start();
+}
+
+void TcpIpSendThread::run (void)
+{
+ try
+ {
+ deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
+
+ while (!m_buffer.isCanceled())
+ {
+ int numToSend = 0;
+ int numSent = 0;
+ deSocketResult result = DE_SOCKETRESULT_LAST;
+
+ try
+ {
+ // Wait for single byte and then try to read more.
+ m_buffer.read(1, &buf[0]);
+ numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
+ }
+ catch (const de::BlockBuffer<deUint8>::CanceledException&)
+ {
+ // Handled in loop condition.
+ }
+
+ while (numSent < numToSend)
+ {
+ result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
+
+ if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
+ XE_FAIL("Connection closed");
+ else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
+ XE_FAIL("Connection terminated");
+ else if (result == DE_SOCKETRESULT_ERROR)
+ XE_FAIL("Socket error");
+ else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
+ {
+ // \note Socket should not be in non-blocking mode.
+ DE_ASSERT(numSent <= 0);
+ deYield();
+ }
+ else
+ DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
+ }
+ }
+ }
+ catch (const std::exception& e)
+ {
+ m_state.setState(COMMLINKSTATE_ERROR, e.what());
+ }
+}
+
+void TcpIpSendThread::stop (void)
+{
+ if (m_isRunning)
+ {
+ m_buffer.cancel();
+ join();
+ m_isRunning = false;
+ }
+}
+
+// TcpIpRecvThread
+
+TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
+ : m_socket (socket)
+ , m_state (state)
+ , m_curMsgPos (0)
+ , m_isRunning (false)
+{
+}
+
+TcpIpRecvThread::~TcpIpRecvThread (void)
+{
+}
+
+void TcpIpRecvThread::start (void)
+{
+ DE_ASSERT(!m_isRunning);
+
+ // Reset state.
+ m_curMsgPos = 0;
+ m_isRunning = true;
+
+ de::Thread::start();
+}
+
+void TcpIpRecvThread::run (void)
+{
+ try
+ {
+ for (;;)
+ {
+ bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
+ bool hasPayload = false;
+ int messageSize = 0;
+ xs::MessageType messageType = (xs::MessageType)0;
+
+ if (hasHeader)
+ {
+ xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
+ hasPayload = m_curMsgPos >= messageSize;
+ }
+
+ if (hasPayload)
+ {
+ // Process message.
+ handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
+ m_curMsgPos = 0;
+ }
+ else
+ {
+ // Try to receive missing bytes.
+ int curSize = hasHeader ? messageSize : xs::MESSAGE_HEADER_SIZE;
+ int bytesToRecv = curSize-m_curMsgPos;
+ int numRecv = 0;
+ deSocketResult result = DE_SOCKETRESULT_LAST;
+
+ if ((int)m_curMsgBuf.size() < curSize)
+ m_curMsgBuf.resize(curSize);
+
+ result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
+
+ if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
+ XE_FAIL("Connection closed");
+ else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
+ XE_FAIL("Connection terminated");
+ else if (result == DE_SOCKETRESULT_ERROR)
+ XE_FAIL("Socket error");
+ else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
+ {
+ // \note Socket should not be in non-blocking mode.
+ DE_ASSERT(numRecv <= 0);
+ deYield();
+ }
+ else
+ {
+ DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
+ DE_ASSERT(numRecv <= bytesToRecv);
+ m_curMsgPos += numRecv;
+ // Continue receiving bytes / handle message in next iter.
+ }
+ }
+ }
+ }
+ catch (const std::exception& e)
+ {
+ m_state.setState(COMMLINKSTATE_ERROR, e.what());
+ }
+}
+
+void TcpIpRecvThread::stop (void)
+{
+ if (m_isRunning)
+ {
+ // \note Socket must be closed before terminating receive thread.
+ XE_CHECK(!m_socket.isReceiveOpen());
+
+ join();
+ m_isRunning = false;
+ }
+}
+
+void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, int dataSize)
+{
+ switch (messageType)
+ {
+ case xs::MESSAGETYPE_KEEPALIVE:
+ m_state.onKeepaliveReceived();
+ break;
+
+ case xs::MESSAGETYPE_PROCESS_STARTED:
+ XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
+ m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
+ break;
+
+ case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
+ {
+ xs::ProcessLaunchFailedMessage msg(data, dataSize);
+ XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
+ m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
+ break;
+ }
+
+ case xs::MESSAGETYPE_PROCESS_FINISHED:
+ {
+ XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
+ xs::ProcessFinishedMessage msg(data, dataSize);
+ m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
+ DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
+ break;
+ }
+
+ case xs::MESSAGETYPE_PROCESS_LOG_DATA:
+ case xs::MESSAGETYPE_INFO:
+ // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
+ if (data[dataSize-1] == 0)
+ dataSize -= 1;
+
+ if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
+ {
+ XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
+ m_state.onTestLogData(&data[0], dataSize);
+ }
+ else
+ m_state.onInfoLogData(&data[0], dataSize);
+ break;
+
+ default:
+ XE_FAIL("Unknown message");
+ }
+}
+
+// TcpIpLink
+
+TcpIpLink::TcpIpLink (void)
+ : m_state (COMMLINKSTATE_ERROR, "Not connected")
+ , m_sendThread (m_socket, m_state)
+ , m_recvThread (m_socket, m_state)
+ , m_keepaliveTimer (DE_NULL)
+{
+ m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
+ XE_CHECK(m_keepaliveTimer);
+}
+
+TcpIpLink::~TcpIpLink (void)
+{
+ try
+ {
+ closeConnection();
+ }
+ catch (...)
+ {
+ // Can't do much except to ignore error.
+ }
+ deTimer_destroy(m_keepaliveTimer);
+}
+
+void TcpIpLink::closeConnection (void)
+{
+ {
+ deSocketState state = m_socket.getState();
+ if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
+ m_socket.shutdown();
+ }
+
+ if (deTimer_isActive(m_keepaliveTimer))
+ deTimer_disable(m_keepaliveTimer);
+
+ if (m_sendThread.isRunning())
+ m_sendThread.stop();
+
+ if (m_recvThread.isRunning())
+ m_recvThread.stop();
+
+ if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
+ m_socket.close();
+}
+
+void TcpIpLink::connect (const de::SocketAddress& address)
+{
+ XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
+ XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
+ XE_CHECK(!m_sendThread.isRunning());
+ XE_CHECK(!m_recvThread.isRunning());
+
+ m_socket.connect(address);
+
+ try
+ {
+ // Clear error and set state to ready.
+ m_state.setState(COMMLINKSTATE_READY, "");
+ m_state.onKeepaliveReceived();
+
+ // Launch threads.
+ m_sendThread.start();
+ m_recvThread.start();
+
+ XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
+ }
+ catch (const std::exception& e)
+ {
+ closeConnection();
+ m_state.setState(COMMLINKSTATE_ERROR, e.what());
+ }
+}
+
+void TcpIpLink::disconnect (void)
+{
+ try
+ {
+ closeConnection();
+ m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
+ }
+ catch (const std::exception& e)
+ {
+ m_state.setState(COMMLINKSTATE_ERROR, e.what());
+ }
+}
+
+void TcpIpLink::reset (void)
+{
+ // \note Just clears error state if we are connected.
+ if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
+ {
+ m_state.setState(COMMLINKSTATE_READY, "");
+
+ // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
+ }
+ else
+ disconnect(); // Abnormal state/usage. Disconnect socket.
+}
+
+void TcpIpLink::keepaliveTimerCallback (void* ptr)
+{
+ TcpIpLink* link = static_cast<TcpIpLink*>(ptr);
+ deUint64 lastKeepalive = link->m_state.getLastKeepaliveRecevied();
+ deUint64 curTime = deGetMicroseconds();
+
+ // Check for timeout.
+ if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
+ link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
+
+ // Enqueue new keepalive.
+ try
+ {
+ writeKeepalive(link->m_sendThread.getBuffer());
+ }
+ catch (const de::BlockBuffer<deUint8>::CanceledException&)
+ {
+ // Ignore. Can happen in connection teardown.
+ }
+}
+
+CommLinkState TcpIpLink::getState (void) const
+{
+ return m_state.getState();
+}
+
+CommLinkState TcpIpLink::getState (std::string& message) const
+{
+ return m_state.getState(message);
+}
+
+void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
+{
+ m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
+}
+
+void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
+{
+ XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
+
+ m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
+ writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
+}
+
+void TcpIpLink::stopTestProcess (void)
+{
+ XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
+ writeStopExecution(m_sendThread.getBuffer());
+}
+
+} // xe