blob: 6ede34d84f5be6afc989b23b6256d33411d91cf4 [file] [log] [blame]
Michael Butler60296322019-01-17 17:54:51 -08001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Michael Butler89e99ba2019-01-24 02:36:37 -080017#define LOG_TAG "ExecutionBurstServer"
18
Michael Butler60296322019-01-17 17:54:51 -080019#include "ExecutionBurstServer.h"
20
21#include <android-base/logging.h>
Michael Butler89e99ba2019-01-24 02:36:37 -080022#include <set>
23#include <string>
Michael Butler3db6fe52019-01-29 11:20:30 -080024#include "Tracing.h"
Michael Butler60296322019-01-17 17:54:51 -080025
Michael Butler3db6fe52019-01-29 11:20:30 -080026namespace android::nn {
Michael Butler60296322019-01-17 17:54:51 -080027
Michael Butler3db6fe52019-01-29 11:20:30 -080028ExecutionBurstServer::BurstMemoryCache::BurstMemoryCache(const sp<IBurstCallback>& callback)
29 : mCallback(callback) {}
Michael Butler60296322019-01-17 17:54:51 -080030
Michael Butler3db6fe52019-01-29 11:20:30 -080031hidl_vec<hidl_memory> ExecutionBurstServer::BurstMemoryCache::getMemories(
32 const std::vector<int32_t>& slots) {
Michael Butler60296322019-01-17 17:54:51 -080033 std::lock_guard<std::mutex> guard(mMutex);
34
35 // find unique unknown slots
Michael Butler89e99ba2019-01-24 02:36:37 -080036 std::set<int32_t> setOfUnknownSlots;
37 for (int32_t slot : slots) {
38 if (mSlotToMemoryCache.find(slot) == mSlotToMemoryCache.end()) {
39 setOfUnknownSlots.insert(slot);
40 }
41 }
Michael Butler3db6fe52019-01-29 11:20:30 -080042 const std::vector<int32_t> vecOfUnknownSlots(setOfUnknownSlots.begin(),
43 setOfUnknownSlots.end());
Michael Butler60296322019-01-17 17:54:51 -080044
45 // retrieve unknown slots
Michael Butler3db6fe52019-01-29 11:20:30 -080046 if (!vecOfUnknownSlots.empty()) {
Michael Butler89e99ba2019-01-24 02:36:37 -080047 ErrorStatus errorStatus = ErrorStatus::GENERAL_FAILURE;
48 std::vector<hidl_memory> returnedMemories;
49 Return<void> ret = mCallback->getMemories(
Michael Butler3db6fe52019-01-29 11:20:30 -080050 vecOfUnknownSlots,
51 [&errorStatus, &returnedMemories](ErrorStatus status,
52 const hidl_vec<hidl_memory>& memories) {
Michael Butler89e99ba2019-01-24 02:36:37 -080053 errorStatus = status;
54 if (status == ErrorStatus::NONE) {
55 returnedMemories = memories;
56 }
57 });
Michael Butler60296322019-01-17 17:54:51 -080058
Michael Butler89e99ba2019-01-24 02:36:37 -080059 if (!ret.isOk() || errorStatus != ErrorStatus::NONE) {
60 LOG(ERROR) << "Error retrieving memories";
61 return {};
62 }
Michael Butler60296322019-01-17 17:54:51 -080063
Michael Butler89e99ba2019-01-24 02:36:37 -080064 // add memories to unknown slots
Michael Butler3db6fe52019-01-29 11:20:30 -080065 for (size_t i = 0; i < vecOfUnknownSlots.size(); ++i) {
66 mSlotToMemoryCache[vecOfUnknownSlots[i]] = returnedMemories[i];
Michael Butler89e99ba2019-01-24 02:36:37 -080067 }
Michael Butler60296322019-01-17 17:54:51 -080068 }
69
70 // get all slots
71 hidl_vec<hidl_memory> memories(slots.size());
72 for (size_t i = 0; i < slots.size(); ++i) {
73 memories[i] = mSlotToMemoryCache[slots[i]];
74 }
Michael Butler89e99ba2019-01-24 02:36:37 -080075
Michael Butler60296322019-01-17 17:54:51 -080076 return memories;
77}
78
Michael Butler3db6fe52019-01-29 11:20:30 -080079void ExecutionBurstServer::BurstMemoryCache::freeMemory(int32_t slot) {
Michael Butler60296322019-01-17 17:54:51 -080080 std::lock_guard<std::mutex> guard(mMutex);
81 mSlotToMemoryCache.erase(slot);
82}
83
Michael Butler3db6fe52019-01-29 11:20:30 -080084sp<ExecutionBurstServer> ExecutionBurstServer::create(
85 const sp<IBurstCallback>& callback, const MQDescriptorSync<FmqRequestDatum>& requestChannel,
86 const MQDescriptorSync<FmqResultDatum>& resultChannel, IPreparedModel* preparedModel) {
87 // check inputs
88 if (callback == nullptr || preparedModel == nullptr) {
89 LOG(ERROR) << "ExecutionBurstServer::create passed a nullptr";
90 return nullptr;
91 }
92
93 // create FMQ objects
94 std::unique_ptr<FmqRequestChannel> fmqRequestChannel{new (std::nothrow)
95 FmqRequestChannel(requestChannel)};
96 std::unique_ptr<FmqResultChannel> fmqResultChannel{new (std::nothrow)
97 FmqResultChannel(resultChannel)};
98
99 // check FMQ objects
100 if (!fmqRequestChannel || !fmqResultChannel || !fmqRequestChannel->isValid() ||
101 !fmqResultChannel->isValid()) {
102 LOG(ERROR) << "ExecutionBurstServer::create failed to create FastMessageQueue";
103 return nullptr;
104 }
105
106 // make and return context
107 return new ExecutionBurstServer(callback, std::move(fmqRequestChannel),
108 std::move(fmqResultChannel), preparedModel);
109}
110
Michael Butler60296322019-01-17 17:54:51 -0800111ExecutionBurstServer::ExecutionBurstServer(const sp<IBurstCallback>& callback,
112 std::unique_ptr<FmqRequestChannel> requestChannel,
113 std::unique_ptr<FmqResultChannel> resultChannel,
114 IPreparedModel* preparedModel)
115 : mMemoryCache(callback),
116 mFmqRequestChannel(std::move(requestChannel)),
117 mFmqResultChannel(std::move(resultChannel)),
118 mPreparedModel(preparedModel),
119 mBlocking(mFmqRequestChannel->getEventFlagWord() != nullptr) {
120 // TODO: highly document the threading behavior of this class
121 mWorker = std::async(std::launch::async, [this] { task(); });
122}
123
124ExecutionBurstServer::~ExecutionBurstServer() {
125 // set teardown flag
126 mTeardown = true;
127
128 // force unblock
Michael Butler89e99ba2019-01-24 02:36:37 -0800129 // ExecutionBurstServer is by default waiting on a request packet. If the
130 // client process destroys its burst object, the server will still be
131 // waiting on the futex (assuming mBlocking is true). This force unblock
132 // wakes up any thread waiting on the futex.
Michael Butler60296322019-01-17 17:54:51 -0800133 if (mBlocking) {
Michael Butler89e99ba2019-01-24 02:36:37 -0800134 // TODO: look for a different/better way to signal/notify the futex to
135 // wake up any thread waiting on it
Michael Butler60296322019-01-17 17:54:51 -0800136 FmqRequestDatum datum;
137 datum.packetInformation({/*.packetSize=*/0, /*.numberOfInputOperands=*/0,
138 /*.numberOfOutputOperands=*/0, /*.numberOfPools=*/0});
139 mFmqRequestChannel->writeBlocking(&datum, 1);
140 }
141
142 // wait for task thread to end
143 mWorker.wait();
144}
145
146bool ExecutionBurstServer::sendPacket(const std::vector<FmqResultDatum>& packet) {
147 if (mTeardown) {
148 return false;
149 }
150
151 if (mBlocking) {
152 return mFmqResultChannel->writeBlocking(packet.data(), packet.size());
153 } else {
154 return mFmqResultChannel->write(packet.data(), packet.size());
155 }
156}
157
158std::vector<FmqRequestDatum> ExecutionBurstServer::getPacketBlocking() {
159 using discriminator = FmqRequestDatum::hidl_discriminator;
160
161 if (mTeardown) {
162 return {};
163 }
164
Michael Butler89e99ba2019-01-24 02:36:37 -0800165 // wait for request packet and read first element of request packet
166 // TODO: have a more elegant way to wait for data, and read it all at once.
167 // For example, EventFlag can be used to directly wait on the futex, and all
168 // the data can be read at once with a non-blocking call to
169 // MessageQueue::read. For further optimization, MessageQueue::beginRead and
170 // MessageQueue::commitRead can be used to avoid an extra copy of the
171 // metadata.
Michael Butler60296322019-01-17 17:54:51 -0800172 FmqRequestDatum datum;
173 bool success = false;
174 if (mBlocking) {
175 success = mFmqRequestChannel->readBlocking(&datum, 1);
176 } else {
177 while ((success = !mTeardown.load(std::memory_order_relaxed)) &&
178 !mFmqRequestChannel->read(&datum, 1)) {
179 }
180 }
181
182 // terminate loop
183 if (mTeardown) {
184 return {};
185 }
186
187 // validate packet information
188 if (!success || datum.getDiscriminator() != discriminator::packetInformation) {
189 LOG(ERROR) << "FMQ Request packet ill-formed";
190 return {};
191 }
192
Michael Butler3db6fe52019-01-29 11:20:30 -0800193 NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION, "ExecutionBurstServer getting packet");
194
Michael Butler60296322019-01-17 17:54:51 -0800195 // unpack packet information
196 const auto& packetInfo = datum.packetInformation();
197 const size_t count = packetInfo.packetSize;
198
199 // retrieve remaining elements
200 // NOTE: all of the data is already available at this point, so there's no
Michael Butler3db6fe52019-01-29 11:20:30 -0800201 // need to do a blocking wait to wait for more data. This is known because
202 // in FMQ, all writes are published (made available) atomically. Currently,
203 // the producer always publishes the entire packet in one function call, so
204 // if the first element of the packet is available, the remaining elements
205 // are also available.
Michael Butler60296322019-01-17 17:54:51 -0800206 std::vector<FmqRequestDatum> packet(count);
207 packet.front() = datum;
208 success = mFmqRequestChannel->read(packet.data() + 1, packet.size() - 1);
209
210 if (!success) {
211 return {};
212 }
213
214 return packet;
215}
216
217// deserialize request
218std::pair<Request, MeasureTiming> ExecutionBurstServer::deserialize(
219 const std::vector<FmqRequestDatum>& data) {
220 using discriminator = FmqRequestDatum::hidl_discriminator;
221
222 Request request;
223 size_t index = 0;
224
225 // validate packet information
226 if (data[index].getDiscriminator() != discriminator::packetInformation) {
227 LOG(ERROR) << "FMQ Request packet ill-formed";
228 return {{}, MeasureTiming::NO};
229 }
230
231 // unpackage packet information
232 const FmqRequestDatum::PacketInformation& packetInfo = data[index].packetInformation();
233 index++;
234 const uint32_t packetSize = packetInfo.packetSize;
235 const uint32_t numberOfInputOperands = packetInfo.numberOfInputOperands;
236 const uint32_t numberOfOutputOperands = packetInfo.numberOfOutputOperands;
237 const uint32_t numberOfPools = packetInfo.numberOfPools;
238
239 // unpackage input operands
240 std::vector<RequestArgument> inputs;
241 inputs.reserve(numberOfInputOperands);
242 for (size_t operand = 0; operand < numberOfInputOperands; ++operand) {
243 // validate input operand information
244 if (data[index].getDiscriminator() != discriminator::inputOperandInformation) {
245 LOG(ERROR) << "FMQ Request packet ill-formed";
246 return {{}, MeasureTiming::NO};
247 }
248
249 // unpackage operand information
250 const FmqRequestDatum::OperandInformation& operandInfo =
251 data[index].inputOperandInformation();
252 index++;
253 const bool hasNoValue = operandInfo.hasNoValue;
254 const DataLocation location = operandInfo.location;
255 const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
256
257 // unpackage operand dimensions
258 std::vector<uint32_t> dimensions;
259 dimensions.reserve(numberOfDimensions);
260 for (size_t i = 0; i < numberOfDimensions; ++i) {
261 // validate dimension
262 if (data[index].getDiscriminator() != discriminator::inputOperandDimensionValue) {
263 LOG(ERROR) << "FMQ Request packet ill-formed";
264 return {{}, MeasureTiming::NO};
265 }
266
267 // unpackage dimension
268 const uint32_t dimension = data[index].inputOperandDimensionValue();
269 index++;
270
271 // store result
272 dimensions.push_back(dimension);
273 }
274
275 // store result
276 inputs.push_back(
277 {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
278 }
279
280 // unpackage output operands
281 std::vector<RequestArgument> outputs;
282 outputs.reserve(numberOfOutputOperands);
283 for (size_t operand = 0; operand < numberOfOutputOperands; ++operand) {
284 // validate output operand information
285 if (data[index].getDiscriminator() != discriminator::outputOperandInformation) {
286 LOG(ERROR) << "FMQ Request packet ill-formed";
287 return {{}, MeasureTiming::NO};
288 }
289
290 // unpackage operand information
291 const FmqRequestDatum::OperandInformation& operandInfo =
292 data[index].outputOperandInformation();
293 index++;
294 const bool hasNoValue = operandInfo.hasNoValue;
295 const DataLocation location = operandInfo.location;
296 const uint32_t numberOfDimensions = operandInfo.numberOfDimensions;
297
298 // unpackage operand dimensions
299 std::vector<uint32_t> dimensions;
300 dimensions.reserve(numberOfDimensions);
301 for (size_t i = 0; i < numberOfDimensions; ++i) {
302 // validate dimension
303 if (data[index].getDiscriminator() != discriminator::outputOperandDimensionValue) {
304 LOG(ERROR) << "FMQ Request packet ill-formed";
305 return {{}, MeasureTiming::NO};
306 }
307
308 // unpackage dimension
309 const uint32_t dimension = data[index].outputOperandDimensionValue();
310 index++;
311
312 // store result
313 dimensions.push_back(dimension);
314 }
315
316 // store result
317 outputs.push_back(
318 {/*.hasNoValue=*/hasNoValue, /*.location=*/location, /*.dimensions=*/dimensions});
319 }
320
321 // unpackage pools
322 std::vector<int32_t> slots;
323 slots.reserve(numberOfPools);
324 for (size_t pool = 0; pool < numberOfPools; ++pool) {
325 // validate input operand information
326 if (data[index].getDiscriminator() != discriminator::poolIdentifier) {
327 LOG(ERROR) << "FMQ Request packet ill-formed";
328 return {{}, MeasureTiming::NO};
329 }
330
331 // unpackage operand information
332 const int32_t poolId = data[index].poolIdentifier();
333 index++;
334
335 // store result
336 slots.push_back(poolId);
337 }
338 hidl_vec<hidl_memory> pools = mMemoryCache.getMemories(slots);
339
340 // validate measureTiming
341 if (data[index].getDiscriminator() != discriminator::measureTiming) {
342 LOG(ERROR) << "FMQ Request packet ill-formed";
343 return {{}, MeasureTiming::NO};
344 }
345
346 // unpackage measureTiming
347 const MeasureTiming measure = data[index].measureTiming();
348 index++;
349
350 // validate packet information
351 if (index != packetSize) {
352 LOG(ERROR) << "FMQ Result packet ill-formed";
353 return {{}, MeasureTiming::NO};
354 }
355
356 // return request
357 return {{/*.inputs=*/inputs, /*.outputs=*/outputs, /*.pools=*/std::move(pools)}, measure};
358}
359
360// serialize result
361std::vector<FmqResultDatum> ExecutionBurstServer::serialize(
362 ErrorStatus errorStatus, const std::vector<OutputShape>& outputShapes, Timing timing) {
363 // count how many elements need to be sent for a request
364 size_t count = 2 + outputShapes.size();
365 for (const auto& outputShape : outputShapes) {
366 count += outputShape.dimensions.size();
367 }
368
369 // create buffer to temporarily store elements
370 std::vector<FmqResultDatum> data;
371 data.reserve(count);
372
373 // package packetInfo
374 {
375 FmqResultDatum datum;
376 datum.packetInformation({/*.packetSize=*/static_cast<uint32_t>(count),
377 /*.errorStatus=*/errorStatus,
378 /*.numberOfOperands=*/static_cast<uint32_t>(outputShapes.size())});
379 data.push_back(datum);
380 }
381
382 // package output shape data
383 for (const auto& operand : outputShapes) {
384 // package operand information
385 FmqResultDatum datum;
386 datum.operandInformation(
387 {/*.isSufficient=*/operand.isSufficient,
388 /*.numberOfDimensions=*/static_cast<uint32_t>(operand.dimensions.size())});
389 data.push_back(datum);
390
391 // package operand dimensions
392 for (uint32_t dimension : operand.dimensions) {
393 FmqResultDatum datum;
394 datum.operandDimensionValue(dimension);
395 data.push_back(datum);
396 }
397 }
398
399 // package executionTiming
400 {
401 FmqResultDatum datum;
402 datum.executionTiming(timing);
403 data.push_back(datum);
404 }
405
406 // return result
407 return data;
408}
409
410Return<void> ExecutionBurstServer::freeMemory(int32_t slot) {
411 mMemoryCache.freeMemory(slot);
412 return Void();
413}
414
415void ExecutionBurstServer::task() {
416 while (!mTeardown) {
417 // receive request
418 const std::vector<FmqRequestDatum> requestData = getPacketBlocking();
419
420 // terminate loop
421 if (mTeardown) {
422 return;
423 }
424
Michael Butler3db6fe52019-01-29 11:20:30 -0800425 NNTRACE_FULL(NNTRACE_LAYER_IPC, NNTRACE_PHASE_EXECUTION,
426 "ExecutionBurstServer processing packet and returning results");
427
Michael Butler60296322019-01-17 17:54:51 -0800428 // continue processing
429 Request request;
430 MeasureTiming measure;
431 std::tie(request, measure) = deserialize(requestData);
432
433 // perform computation
434 ErrorStatus errorStatus = ErrorStatus::GENERAL_FAILURE;
435 std::vector<OutputShape> outputShapes;
436 Timing returnedTiming;
Michael Butler89e99ba2019-01-24 02:36:37 -0800437 // This call to IPreparedModel::executeSynchronously occurs entirely
438 // within the same process, so ignore the Return<> errors via .isOk().
439 // TODO: verify it is safe to always call isOk() here, or if there is
440 // any benefit to checking any potential errors.
Michael Butler60296322019-01-17 17:54:51 -0800441 mPreparedModel
442 ->executeSynchronously(request, measure,
443 [&errorStatus, &outputShapes, &returnedTiming](
444 ErrorStatus status,
445 const hidl_vec<OutputShape>& shapes, Timing timing) {
446 errorStatus = status;
447 outputShapes = shapes;
448 returnedTiming = timing;
449 })
450 .isOk();
451
452 // return result
453 const std::vector<FmqResultDatum> result =
454 serialize(errorStatus, outputShapes, returnedTiming);
455 sendPacket(result);
456 }
457}
458
Michael Butler3db6fe52019-01-29 11:20:30 -0800459} // namespace android::nn