blob: 1909693ecb165b531214ffdb2840c80657f33b9c [file] [log] [blame]
Lang Hames0b2331b2017-04-06 01:49:21 +00001//===----------------------- Queue.h - RPC Queue ------------------*-c++-*-===//
2//
Chandler Carruth2946cd72019-01-19 08:50:56 +00003// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4// See https://llvm.org/LICENSE.txt for license information.
5// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
Lang Hames0b2331b2017-04-06 01:49:21 +00006//
7//===----------------------------------------------------------------------===//
8
9#ifndef LLVM_UNITTESTS_EXECUTIONENGINE_ORC_QUEUECHANNEL_H
10#define LLVM_UNITTESTS_EXECUTIONENGINE_ORC_QUEUECHANNEL_H
11
12#include "llvm/ExecutionEngine/Orc/RawByteChannel.h"
13#include "llvm/Support/Error.h"
14
Lang Hamesf5593e22017-04-06 04:12:47 +000015#include <condition_variable>
Chandler Carruth9a67b072017-06-06 11:06:56 +000016#include <queue>
Lang Hames0b2331b2017-04-06 01:49:21 +000017
18namespace llvm {
19
20class QueueChannelError : public ErrorInfo<QueueChannelError> {
21public:
22 static char ID;
23};
24
25class QueueChannelClosedError
26 : public ErrorInfo<QueueChannelClosedError, QueueChannelError> {
27public:
28 static char ID;
29 std::error_code convertToErrorCode() const override {
30 return inconvertibleErrorCode();
31 }
32
33 void log(raw_ostream &OS) const override {
34 OS << "Queue closed";
35 }
36};
37
38class Queue : public std::queue<char> {
39public:
40 using ErrorInjector = std::function<Error()>;
41
42 Queue()
43 : ReadError([]() { return Error::success(); }),
44 WriteError([]() { return Error::success(); }) {}
45
46 Queue(const Queue&) = delete;
47 Queue& operator=(const Queue&) = delete;
48 Queue(Queue&&) = delete;
49 Queue& operator=(Queue&&) = delete;
50
51 std::mutex &getMutex() { return M; }
52 std::condition_variable &getCondVar() { return CV; }
53 Error checkReadError() { return ReadError(); }
54 Error checkWriteError() { return WriteError(); }
55 void setReadError(ErrorInjector NewReadError) {
56 {
57 std::lock_guard<std::mutex> Lock(M);
58 ReadError = std::move(NewReadError);
59 }
60 CV.notify_one();
61 }
62 void setWriteError(ErrorInjector NewWriteError) {
63 std::lock_guard<std::mutex> Lock(M);
64 WriteError = std::move(NewWriteError);
65 }
66private:
67 std::mutex M;
68 std::condition_variable CV;
69 std::function<Error()> ReadError, WriteError;
70};
71
72class QueueChannel : public orc::rpc::RawByteChannel {
73public:
74 QueueChannel(std::shared_ptr<Queue> InQueue,
75 std::shared_ptr<Queue> OutQueue)
76 : InQueue(InQueue), OutQueue(OutQueue) {}
77
78 QueueChannel(const QueueChannel&) = delete;
79 QueueChannel& operator=(const QueueChannel&) = delete;
80 QueueChannel(QueueChannel&&) = delete;
81 QueueChannel& operator=(QueueChannel&&) = delete;
82
Lang Hamesc1105112019-09-06 19:21:59 +000083 template <typename FunctionIdT, typename SequenceIdT>
84 Error startSendMessage(const FunctionIdT &FnId, const SequenceIdT &SeqNo) {
85 ++InFlightOutgoingMessages;
86 return orc::rpc::RawByteChannel::startSendMessage(FnId, SeqNo);
87 }
88
89 Error endSendMessage() {
90 --InFlightOutgoingMessages;
91 ++CompletedOutgoingMessages;
92 return orc::rpc::RawByteChannel::endSendMessage();
93 }
94
95 template <typename FunctionIdT, typename SequenceNumberT>
96 Error startReceiveMessage(FunctionIdT &FnId, SequenceNumberT &SeqNo) {
97 ++InFlightIncomingMessages;
98 return orc::rpc::RawByteChannel::startReceiveMessage(FnId, SeqNo);
99 }
100
101 Error endReceiveMessage() {
102 --InFlightIncomingMessages;
103 ++CompletedIncomingMessages;
104 return orc::rpc::RawByteChannel::endReceiveMessage();
105 }
106
Lang Hames0b2331b2017-04-06 01:49:21 +0000107 Error readBytes(char *Dst, unsigned Size) override {
108 std::unique_lock<std::mutex> Lock(InQueue->getMutex());
109 while (Size) {
110 {
111 Error Err = InQueue->checkReadError();
112 while (!Err && InQueue->empty()) {
113 InQueue->getCondVar().wait(Lock);
114 Err = InQueue->checkReadError();
115 }
116 if (Err)
117 return Err;
118 }
119 *Dst++ = InQueue->front();
120 --Size;
121 ++NumRead;
122 InQueue->pop();
123 }
124 return Error::success();
125 }
126
127 Error appendBytes(const char *Src, unsigned Size) override {
128 std::unique_lock<std::mutex> Lock(OutQueue->getMutex());
129 while (Size--) {
130 if (Error Err = OutQueue->checkWriteError())
131 return Err;
132 OutQueue->push(*Src++);
133 ++NumWritten;
134 }
135 OutQueue->getCondVar().notify_one();
136 return Error::success();
137 }
138
Lang Hamesc1105112019-09-06 19:21:59 +0000139 Error send() override {
140 ++SendCalls;
141 return Error::success();
142 }
Lang Hames0b2331b2017-04-06 01:49:21 +0000143
144 void close() {
145 auto ChannelClosed = []() { return make_error<QueueChannelClosedError>(); };
146 InQueue->setReadError(ChannelClosed);
147 InQueue->setWriteError(ChannelClosed);
148 OutQueue->setReadError(ChannelClosed);
149 OutQueue->setWriteError(ChannelClosed);
150 }
151
152 uint64_t NumWritten = 0;
153 uint64_t NumRead = 0;
Lang Hamesc1105112019-09-06 19:21:59 +0000154 std::atomic<size_t> InFlightIncomingMessages{0};
155 std::atomic<size_t> CompletedIncomingMessages{0};
156 std::atomic<size_t> InFlightOutgoingMessages{0};
157 std::atomic<size_t> CompletedOutgoingMessages{0};
158 std::atomic<size_t> SendCalls{0};
Lang Hames0b2331b2017-04-06 01:49:21 +0000159
160private:
161
162 std::shared_ptr<Queue> InQueue;
163 std::shared_ptr<Queue> OutQueue;
164};
165
166inline std::pair<std::unique_ptr<QueueChannel>, std::unique_ptr<QueueChannel>>
167createPairedQueueChannels() {
168 auto Q1 = std::make_shared<Queue>();
169 auto Q2 = std::make_shared<Queue>();
Jonas Devlieghere0eaee542019-08-15 15:54:37 +0000170 auto C1 = std::make_unique<QueueChannel>(Q1, Q2);
171 auto C2 = std::make_unique<QueueChannel>(Q2, Q1);
Lang Hames0b2331b2017-04-06 01:49:21 +0000172 return std::make_pair(std::move(C1), std::move(C2));
173}
174
175}
176
177#endif